http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionStorage.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionStorage.java
new file mode 100644
index 0000000..4ab0c9b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionStorage.java
@@ -0,0 +1,809 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RemoteException;
+
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.fs.FSUtilsWithRetries;
+import org.apache.hadoop.hbase.fs.RegionStorage;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MetaUtils;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+
+@InterfaceAudience.Private
+public class LegacyRegionStorage extends RegionStorage<LegacyPathIdentifier> {
+  private static final Log LOG = LogFactory.getLog(LegacyRegionStorage.class);
+
+  private final Path tableDir;
+  private final Path regionDir;
+  private final Path mobDir;
+
+  // regionInfo for interacting with FS (getting encodedName, etc)
+  private final HRegionInfo regionInfoForFs;
+
+  private final FSUtilsWithRetries fsWithRetries;
+
+  public LegacyRegionStorage(Configuration conf, FileSystem fs, 
LegacyPathIdentifier rootContainer, LegacyPathIdentifier regionContainer) 
throws IOException {
+    this(conf, fs, rootContainer, loadRegionInfoFileContent(fs, 
regionContainer.path));
+  }
+
+  public LegacyRegionStorage(Configuration conf, FileSystem fs, 
LegacyPathIdentifier rootContainer, HRegionInfo hri) {
+    super(conf, fs, rootContainer, hri);
+
+    Path dataDir = LegacyLayout.getDataDir(rootContainer.path);
+    this.tableDir = LegacyLayout.getTableDir(dataDir, hri.getTable());
+    this.regionDir = LegacyLayout.getRegionDir(tableDir, hri);
+    this.mobDir = 
LegacyLayout.getDataDir(LegacyLayout.getMobDir(rootContainer.path));
+    this.fsWithRetries = new FSUtilsWithRetries(conf, fs);
+
+    this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(hri);
+  }
+
+  @Override
+  public LegacyPathIdentifier getRegionContainer() {
+    return new LegacyPathIdentifier(regionDir);
+  }
+
+  @Override
+  public LegacyPathIdentifier getTableContainer() {
+    return new LegacyPathIdentifier(tableDir);
+  }
+
+  public static HRegionInfo loadRegionInfoFileContent(FileSystem fs, Path 
regionContainer)
+      throws IOException {
+    FSDataInputStream in = fs.open(new Path(regionContainer, ".regioninfo"));
+    try {
+      return HRegionInfo.parseFrom(in);
+    } finally {
+      in.close();
+    }
+  }
+  // ==========================================================================
+  //  PUBLIC Methods - Families Related
+  // ==========================================================================
+  @Override
+  public Collection<String> getFamilies() throws IOException {
+    FileSystem fs = getFileSystem();
+    FileStatus[] fds = FSUtils.listStatus(fs, regionDir, new 
FSUtils.FamilyDirFilter(fs));
+    if (fds == null) return Collections.emptyList();
+
+    ArrayList<String> families = new ArrayList<String>(fds.length);
+    for (FileStatus status: fds) {
+      families.add(status.getPath().getName());
+    }
+    return families;
+  }
+
+  @Override
+  public void deleteFamily(String familyName, boolean hasMob) throws 
IOException {
+    // archive family store files
+    byte[] bFamilyName = Bytes.toBytes(familyName);
+
+    FileSystem fs = getFileSystem();
+    HFileArchiver.archiveFamily(fs, getConfiguration(), getRegionInfo(), 
tableDir, bFamilyName);
+
+    // delete the family folder
+    HRegionInfo region = getRegionInfo();
+    Path familyDir = new Path(tableDir, new Path(region.getEncodedName(), 
familyName));
+    if (!fsWithRetries.deleteDir(familyDir)) {
+      throw new IOException("Could not delete family "
+          + familyName + " from FileSystem for region "
+          + region.getRegionNameAsString() + "(" + region.getEncodedName()
+          + ")");
+    }
+
+    // archive and delete mob files
+    if (hasMob) {
+      Path mobTableDir = LegacyLayout.getTableDir(mobDir, getTable());
+      HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(getTable());
+      Path mobRegionDir = LegacyLayout.getRegionDir(mobTableDir, 
mobRegionInfo);
+      Path mobFamilyDir = LegacyLayout.getFamilyDir(mobRegionDir, familyName);
+      // archive mob family store files
+      MobUtils.archiveMobStoreFiles(getConfiguration(), getFileSystem(),
+          mobRegionInfo, mobFamilyDir, bFamilyName);
+
+      if (!fsWithRetries.deleteDir(mobFamilyDir)) {
+        throw new IOException("Could not delete mob store files for family "
+            + familyName + " from FileSystem region "
+            + mobRegionInfo.getRegionNameAsString() + "(" + 
mobRegionInfo.getEncodedName() + ")");
+      }
+    }
+  }
+
+  // 
===========================================================================
+  //  Temp Helpers
+  // 
===========================================================================
+  /** @return {@link Path} to the region's temp directory, used for file 
creations */
+  @Override
+  public LegacyPathIdentifier getTempContainer() {
+    return new LegacyPathIdentifier(LegacyLayout.getRegionTempDir(regionDir));
+  }
+
+  /**
+   * Clean up any temp detritus that may have been left around from previous 
operation attempts.
+   */
+  public void cleanupTempContainer() throws IOException {
+    fsWithRetries.deleteDir(getTempContainer().path);
+  }
+
+  // 
===========================================================================
+  //  Store/StoreFile Helpers
+  // 
===========================================================================
+  /**
+   * Returns the directory path of the specified family
+   * @param familyName Column Family Name
+   * @return {@link Path} to the directory of the specified family
+   */
+  public LegacyPathIdentifier getStoreContainer(final String familyName) {
+    return new 
LegacyPathIdentifier(LegacyLayout.getFamilyDir(getRegionContainer().path, 
familyName));
+  }
+
+  /**
+   * Create the store directory for the specified family name
+   * @param familyName Column Family Name
+   * @return {@link Path} to the directory of the specified family
+   * @throws IOException if the directory creation fails.
+   */
+  public LegacyPathIdentifier createStoreContainer(final String familyName) 
throws IOException {
+    Path storeDir = getStoreContainer(familyName).path;
+    if (!fsWithRetries.createDir(storeDir))
+      throw new IOException("Failed creating "+storeDir);
+    return new LegacyPathIdentifier(storeDir);
+  }
+
+  // ==========================================================================
+  //  PUBLIC Methods - Store Files related
+  // ==========================================================================
+
+  @Override
+  public Collection<StoreFileInfo> getStoreFiles(final String familyName, 
final boolean validate)
+      throws IOException {
+    Path familyDir = getStoreContainer(familyName).path;
+    FileStatus[] files = FSUtils.listStatus(getFileSystem(), familyDir);
+    if (files == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No StoreFiles for: " + familyDir);
+      }
+      return null;
+    }
+
+    ArrayList<StoreFileInfo> storeFiles = new 
ArrayList<StoreFileInfo>(files.length);
+    for (FileStatus status: files) {
+      if (validate && !StoreFileInfo.isValid(status)) {
+        LOG.warn("Invalid StoreFile: " + status.getPath());
+        continue;
+      }
+      StoreFileInfo info = 
ServerRegionReplicaUtil.getStoreFileInfo(getConfiguration(),
+          getFileSystem(), getRegionInfo(), regionInfoForFs, familyName, 
status.getPath());
+      storeFiles.add(info);
+
+    }
+    return storeFiles;
+  }
+
+
+  /**
+   * Return Qualified Path of the specified family/file
+   *
+   * @param familyName Column Family Name
+   * @param fileName File Name
+   * @return The qualified Path for the specified family/file
+   */
+  public LegacyPathIdentifier getStoreFileStorageIdentifier(final String 
familyName, final String fileName) {
+    Path familyDir = getStoreContainer(familyName).path;
+    return new LegacyPathIdentifier(LegacyLayout.getStoreFile(familyDir, 
fileName).makeQualified(getFileSystem()));
+  }
+
+  /**
+   * Return the store file information of the specified family/file.
+   *
+   * @param familyName Column Family Name
+   * @param fileName File Name
+   * @return The {@link StoreFileInfo} for the specified family/file
+   */
+  public StoreFileInfo getStoreFileInfo(final String familyName, final String 
fileName)
+      throws IOException {
+    Path familyDir = getStoreContainer(familyName).path;
+    return ServerRegionReplicaUtil.getStoreFileInfo(getConfiguration(),
+      getFileSystem(), getRegionInfo(), regionInfoForFs, familyName,
+      LegacyLayout.getStoreFile(familyDir, fileName));
+  }
+
+  @Override
+  public long getStoreFileLen(final StoreFile store) throws IOException {
+    return store.getFileInfo().getFileStatus().getLen();
+  }
+
+  /**
+   * Returns true if the specified family has reference files
+   * @param familyName Column Family Name
+   * @return true if family contains reference files
+   * @throws IOException
+   */
+  public boolean hasReferences(final String familyName) throws IOException {
+    FileStatus[] files = FSUtils.listStatus(getFileSystem(), 
getStoreContainer(familyName).path);
+    if (files != null) {
+      for(FileStatus stat: files) {
+        if(stat.isDirectory()) {
+          continue;
+        }
+        if(StoreFileInfo.isReference(stat.getPath())) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check whether region has Reference file
+   * @param htd table desciptor of the region
+   * @return true if region has reference file
+   * @throws IOException
+   */
+  public boolean hasReferences(final HTableDescriptor htd) throws IOException {
+    for (HColumnDescriptor family : htd.getFamilies()) {
+      if (hasReferences(family.getNameAsString())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+
+  /**
+   * Generate a unique file name, used by createTempName() and 
commitStoreFile()
+   * @param suffix extra information to append to the generated name
+   * @return Unique file name
+   */
+  private static String generateUniqueName(final String suffix) {
+    String name = UUID.randomUUID().toString().replaceAll("-", "");
+    if (suffix != null) name += suffix;
+    return name;
+  }
+
+  /**
+   * Generate a unique temporary Path. Used in conjuction with 
commitStoreFile()
+   * to get a safer file creation.
+   * <code>
+   * Path file = fs.createTempName();
+   * ...StoreFile.Writer(file)...
+   * fs.commitStoreFile("family", file);
+   * </code>
+   *
+   * @return Unique {@link Path} of the temporary file
+   */
+  public LegacyPathIdentifier getTempIdentifier() {
+    return new LegacyPathIdentifier(createTempName(null));
+  }
+
+  /**
+   * Generate a unique temporary Path. Used in conjuction with 
commitStoreFile()
+   * to get a safer file creation.
+   * <code>
+   * Path file = fs.createTempName();
+   * ...StoreFile.Writer(file)...
+   * fs.commitStoreFile("family", file);
+   * </code>
+   *
+   * @param suffix extra information to append to the generated name
+   * @return Unique {@link Path} of the temporary file
+   */
+  public Path createTempName(final String suffix) {
+    return new Path(getTempContainer().path, generateUniqueName(suffix));
+  }
+
+  /**
+   * Move the file from a build/temp location to the main family store 
directory.
+   * @param familyName Family that will gain the file
+   * @param buildPath {@link Path} to the file to commit.
+   * @return The StoreFile representing the newly committed file
+   * @throws IOException
+   */
+  @Override
+  public StoreFile commitStoreFile(final String familyName, final 
LegacyPathIdentifier buildPath, final CacheConfig cacheConf,  final BloomType 
cfBloomType, final RegionCoprocessorHost coprocessorHost) throws IOException {
+    final Path path = commitStoreFile(familyName, buildPath.path, -1, false);
+    return createStoreFileAndReader(path, cacheConf, cfBloomType, 
coprocessorHost);
+  }
+
+  /**
+   * Move the file from a build/temp location to the main family store 
directory.
+   * @param familyName Family that will gain the file
+   * @param buildPath {@link Path} to the file to commit.
+   * @param seqNum Sequence Number to append to the file name (less then 0 if 
no sequence number)
+   * @param generateNewName False if you want to keep the buildPath name
+   * @return The new {@link Path} of the committed file
+   * @throws IOException
+   */
+  private Path commitStoreFile(final String familyName, final Path buildPath,
+      final long seqNum, final boolean generateNewName) throws IOException {
+    Path storeDir = getStoreContainer(familyName).path;
+    if(!fsWithRetries.createDir(storeDir))
+      throw new IOException("Failed creating " + storeDir);
+
+    String name = buildPath.getName();
+    if (generateNewName) {
+      name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + 
"_");
+    }
+    Path dstPath = new Path(storeDir, name);
+    if (!fsWithRetries.exists(buildPath)) {
+      throw new FileNotFoundException(buildPath.toString());
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Committing store file " + buildPath + " as " + dstPath);
+    }
+    // buildPath exists, therefore not doing an exists() check.
+    if (!fsWithRetries.rename(buildPath, dstPath)) {
+      throw new IOException("Failed rename of " + buildPath + " to " + 
dstPath);
+    }
+    return dstPath;
+  }
+
+
+  /**
+   * Moves multiple store files to the relative region's family store 
directory.
+   * @param storeFiles list of store files divided by family
+   * @throws IOException
+   */
+  public void commitStoreFiles(final Map<byte[], List<StoreFile>> storeFiles) 
throws IOException {
+    for (Map.Entry<byte[], List<StoreFile>> es: storeFiles.entrySet()) {
+      String familyName = Bytes.toString(es.getKey());
+      for (StoreFile sf: es.getValue()) {
+        commitStoreFile(familyName, sf.getPath(), -1, false);
+      }
+    }
+  }
+
+  /**
+   * Closes and archives the specified store files from the specified family.
+   * @param familyName Family that contains the store files
+   * @param storeFiles set of store files to remove
+   * @throws IOException if the archiving fails
+   */
+  public void removeStoreFiles(final String familyName, final 
Collection<StoreFile> storeFiles)
+      throws IOException {
+    HFileArchiver.archiveStoreFiles(getConfiguration(), getFileSystem(), 
this.regionInfoForFs,
+        this.tableDir, Bytes.toBytes(familyName), storeFiles);
+  }
+
+  /**
+   * Bulk load: Add a specified store file to the specified family.
+   * If the source file is on the same file-system is moved from the
+   * source location to the destination location, otherwise is copied over.
+   *
+   * @param familyName Family that will gain the file
+   * @param srcPath {@link Path} to the file to import
+   * @param seqNum Bulk Load sequence number
+   * @return a StoreFile representation of the bulk loaded file
+   * @throws IOException
+   */
+  @Override
+  public StoreFile bulkLoadStoreFile(final String familyName, 
LegacyPathIdentifier src, long seqNum, final CacheConfig cacheConf, final 
BloomType cfBloomType, final RegionCoprocessorHost coprocessorHost)
+      throws IOException {
+    // Copy the file if it's on another filesystem
+    FileSystem fs = getFileSystem();
+    FileSystem srcFs = src.path.getFileSystem(getConfiguration());
+    FileSystem desFs = fs instanceof HFileSystem ? 
((HFileSystem)fs).getBackingFs() : fs;
+    Path commitPath = src.path;
+
+    // We can't compare FileSystem instances as equals() includes UGI instance
+    // as part of the comparison and won't work when doing SecureBulkLoad
+    // TODO deal with viewFS
+    if (!FSHDFSUtils.isSameHdfs(getConfiguration(), srcFs, desFs)) {
+      LOG.info("Bulk-load file " + src+ " is on different filesystem than " +
+          "the destination store. Copying file over to destination 
filesystem.");
+      Path tmpPath = getTempIdentifier().path;
+      FileUtil.copy(srcFs, src.path, fs, tmpPath, false, getConfiguration());
+      LOG.info("Copied " + src + " to temporary path on destination 
filesystem: " + tmpPath);
+      commitPath = tmpPath;
+    }
+
+    final Path result = commitStoreFile(familyName, commitPath, seqNum, true);
+    return createStoreFileAndReader(result, cacheConf, cfBloomType, 
coprocessorHost);
+  }
+
+  private StoreFile createStoreFileAndReader(final Path p, final CacheConfig 
cacheConf,  final BloomType cfBloomType, final RegionCoprocessorHost 
coprocessorHost) throws IOException {
+    final Configuration conf = getConfiguration();
+    final StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), 
p);
+    info.setRegionCoprocessorHost(coprocessorHost);
+    StoreFile storeFile = new StoreFile(this.getFileSystem(), info, conf, 
cacheConf, cfBloomType);
+    StoreFileReader r = storeFile.createReader();
+    HRegionInfo region = getRegionInfo();
+    r.setReplicaStoreFile(region.getReplicaId() == 
HRegionInfo.DEFAULT_REPLICA_ID);
+    return storeFile;
+  }
+
+  // 
===========================================================================
+  //  Splits Helpers
+  // 
===========================================================================
+  /** @return {@link Path} to the temp directory used during split operations 
*/
+  public LegacyPathIdentifier getSplitsContainer() {
+    return new 
LegacyPathIdentifier(LegacyLayout.getRegionSplitsDir(getRegionContainer().path));
+  }
+
+  public LegacyPathIdentifier getSplitsContainer(final HRegionInfo hri) {
+    return new 
LegacyPathIdentifier(LegacyLayout.getRegionSplitsDir(getSplitsContainer().path, 
hri));
+  }
+
+  /**
+   * Clean up any split detritus that may have been left around from previous 
split attempts.
+   */
+  public void cleanupSplitsContainer() throws IOException {
+    fsWithRetries.deleteDir(getSplitsContainer().path);
+  }
+
+  /**
+   * Clean up any split detritus that may have been left around from previous
+   * split attempts.
+   * Call this method on initial region deploy.
+   * @throws IOException
+   */
+  public void cleanupAnySplitDetritus() throws IOException {
+    Path splitdir = this.getSplitsContainer().path;
+    if (!fsWithRetries.exists(splitdir)) return;
+    // Look at the splitdir.  It could have the encoded names of the daughter
+    // regions we tried to make.  See if the daughter regions actually got made
+    // out under the tabledir.  If here under splitdir still, then the split 
did
+    // not complete.  Try and do cleanup.  This code WILL NOT catch the case
+    // where we successfully created daughter a but regionserver crashed during
+    // the creation of region b.  In this case, there'll be an orphan daughter
+    // dir in the filesystem.  TOOD: Fix.
+    FileSystem fs = getFileSystem();
+    FileStatus[] daughters = FSUtils.listStatus(fs, splitdir, new 
FSUtils.DirFilter(fs));
+    if (daughters != null) {
+      for (int i = 0; i < daughters.length; ++i) {
+        Path daughterDir = new Path(this.tableDir, 
daughters[i].getPath().getName());
+        if (!fsWithRetries.deleteDir(daughterDir)) {
+          throw new IOException("Failed delete of " + daughterDir);
+        }
+      }
+    }
+    cleanupSplitsContainer();
+    LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
+  }
+
+  /**
+   * Remove daughter region
+   * @param regionInfo daughter {@link HRegionInfo}
+   * @throws IOException
+   */
+  public void cleanupDaughterRegion(final HRegionInfo regionInfo) throws 
IOException {
+    Path regionDir = LegacyLayout.getRegionDir(tableDir, regionInfo);
+    if (!fsWithRetries.deleteDir(regionDir)) {
+      throw new IOException("Failed delete of " + regionDir);
+    }
+  }
+
+  /**
+   * Commit a daughter region, moving it from the split temporary directory
+   * to the proper location in the filesystem.
+   *
+   * @param regionInfo                 daughter {@link 
org.apache.hadoop.hbase.HRegionInfo}
+   * @throws IOException
+   */
+  public LegacyPathIdentifier commitDaughterRegion(final HRegionInfo 
regionInfo)
+      throws IOException {
+    Path regionDir = LegacyLayout.getRegionDir(tableDir, regionInfo);
+    Path daughterTmpDir = this.getSplitsContainer(regionInfo).path;
+
+    if (fsWithRetries.exists(daughterTmpDir)) {
+
+      // Write HRI to a file in case we need to recover hbase:meta
+      Path regionInfoFile = LegacyLayout.getRegionInfoFile(daughterTmpDir);
+      byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
+      writeRegionInfoFileContent(getConfiguration(), getFileSystem(), 
regionInfoFile, regionInfoContent);
+
+      // Move the daughter temp dir to the table dir
+      if (!fsWithRetries.rename(daughterTmpDir, regionDir)) {
+        throw new IOException("Unable to rename " + daughterTmpDir + " to " + 
regionDir);
+      }
+    }
+
+    return new LegacyPathIdentifier(regionDir);
+  }
+
+  /**
+   * Create the region splits directory.
+   */
+  @Override
+  public void createSplitsContainer() throws IOException {
+    createTransientDir(getSplitsContainer().path);
+  }
+
+  void createTransientDir(Path dir) throws IOException {
+    if (fsWithRetries.exists(dir)) {
+      LOG.info("The " + dir + " directory exists.  Hence deleting it to 
recreate it");
+      if (!fsWithRetries.deleteDir(dir)) {
+        throw new IOException("Failed deletion of " + dir + " before creating 
them again.");
+      }
+    }
+    // dir doesn't exists now. No need to do an exists() call for it.
+    if (!fsWithRetries.createDir(dir)) {
+      throw new IOException("Failed create of " + dir);
+    }
+  }
+
+  /**
+   * Write out a split reference. Package local so it doesnt leak out of
+   * regionserver.
+   * @param hri {@link HRegionInfo} of the destination
+   * @param familyName Column Family Name
+   * @param f File to split.
+   * @param splitRow Split Row
+   * @param top True if we are referring to the top half of the hfile.
+   * @param splitPolicy
+   * @return Path to created reference.
+   * @throws IOException
+   */
+  public LegacyPathIdentifier splitStoreFile(final HRegionInfo hri, final 
String familyName, final StoreFile f,
+      final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy)
+          throws IOException {
+
+    if (splitPolicy == null || 
!splitPolicy.skipStoreFileRangeCheck(familyName)) {
+      // Check whether the split row lies in the range of the store file
+      // If it is outside the range, return directly.
+      try {
+        if (top) {
+          //check if larger than last key.
+          KeyValue splitKey = KeyValueUtil.createFirstOnRow(splitRow);
+          Cell lastKey = f.getLastKey();
+          // If lastKey is null means storefile is empty.
+          if (lastKey == null) {
+            return null;
+          }
+          if (f.getComparator().compare(splitKey, lastKey) > 0) {
+            return null;
+          }
+        } else {
+          //check if smaller than first key
+          KeyValue splitKey = KeyValueUtil.createLastOnRow(splitRow);
+          Cell firstKey = f.getFirstKey();
+          // If firstKey is null means storefile is empty.
+          if (firstKey == null) {
+            return null;
+          }
+          if (f.getComparator().compare(splitKey, firstKey) < 0) {
+            return null;
+          }
+        }
+      } finally {
+        f.closeReader(true);
+      }
+    }
+
+    Path splitDir = new Path(getSplitsContainer(hri).path, familyName);
+    // A reference to the bottom half of the hsf store file.
+    Reference r =
+      top ? Reference.createTopReference(splitRow): 
Reference.createBottomReference(splitRow);
+    // Add the referred-to regions name as a dot separated suffix.
+    // See REF_NAME_REGEX regex above.  The referred-to regions name is
+    // up in the path of the passed in <code>f</code> -- parentdir is family,
+    // then the directory above is the region name.
+    String parentRegionName = regionInfoForFs.getEncodedName();
+    // Write reference with same file id only with the other region name as
+    // suffix and into the new region location (under same family).
+    Path p = new Path(splitDir, f.getPath().getName() + "." + 
parentRegionName);
+    p = r.write(getFileSystem(), p);
+    return p == null ? null : new LegacyPathIdentifier(p);
+  }
+
+  // 
===========================================================================
+  //  Merge Helpers
+  // 
===========================================================================
+  /** @return {@link Path} to the temp directory used during merge operations 
*/
+  public LegacyPathIdentifier getMergesContainer() {
+    return new 
LegacyPathIdentifier(LegacyLayout.getRegionMergesDir(getRegionContainer().path));
+  }
+
+  public LegacyPathIdentifier getMergesContainer(final HRegionInfo hri) {
+    return new 
LegacyPathIdentifier(LegacyLayout.getRegionMergesDir(getMergesContainer().path, 
hri));
+  }
+
+  /**
+   * Clean up any merge detritus that may have been left around from previous 
merge attempts.
+   */
+  public void cleanupMergesContainer() throws IOException {
+    fsWithRetries.deleteDir(getMergesContainer().path);
+  }
+
+  /**
+   * Remove merged region
+   * @param mergedRegion {@link HRegionInfo}
+   * @throws IOException
+   */
+  public void cleanupMergedRegion(final HRegionInfo mergedRegion) throws 
IOException {
+    Path regionDir = LegacyLayout.getRegionDir(tableDir, mergedRegion);
+    if (fsWithRetries.deleteDir(regionDir)) {
+      throw new IOException("Failed delete of " + regionDir);
+    }
+  }
+
+  /**
+   * Create the region merges directory.
+   * @throws IOException If merges dir already exists or we fail to create it.
+   * @see RegionStorage#cleanupMergesContainer()
+   */
+  @Override
+  public void createMergesContainer() throws IOException {
+    createTransientDir(getMergesContainer().path);
+  }
+
+  /**
+   * Write out a merge reference under the given merges directory. Package 
local
+   * so it doesnt leak out of regionserver.
+   * @param mergedRegion {@link HRegionInfo} of the merged region
+   * @param familyName Column Family Name
+   * @param f File to create reference.
+   * @param mergedDir
+   * @return Path to created reference.
+   * @throws IOException
+   */
+  @Override
+  public LegacyPathIdentifier mergeStoreFile(final HRegionInfo mergedRegion, 
final String familyName,
+      final StoreFile f, final LegacyPathIdentifier mergedDir)
+      throws IOException {
+    Path referenceDir = new Path(new Path(mergedDir.path,
+        mergedRegion.getEncodedName()), familyName);
+    // A whole reference to the store file.
+    Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
+    // Add the referred-to regions name as a dot separated suffix.
+    // See REF_NAME_REGEX regex above. The referred-to regions name is
+    // up in the path of the passed in <code>f</code> -- parentdir is family,
+    // then the directory above is the region name.
+    String mergingRegionName = regionInfoForFs.getEncodedName();
+    // Write reference with same file id only with the other region name as
+    // suffix and into the new region location (under same family).
+    Path p = new Path(referenceDir, f.getPath().getName() + "."
+        + mergingRegionName);
+    p = r.write(getFileSystem(), p);
+    return p == null ? null : new LegacyPathIdentifier(p);
+  }
+
+  /**
+   * Commit a merged region, moving it from the merges temporary directory to
+   * the proper location in the filesystem.
+   * @param mergedRegionInfo merged region {@link HRegionInfo}
+   * @throws IOException
+   */
+  @Override
+  public void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws 
IOException {
+    Path regionDir = new Path(this.tableDir, 
mergedRegionInfo.getEncodedName());
+    Path mergedRegionTmpDir = this.getMergesContainer(mergedRegionInfo).path;
+    // Move the tmp dir in the expected location
+    if (mergedRegionTmpDir != null && 
fsWithRetries.exists(mergedRegionTmpDir)) {
+      if (!fsWithRetries.rename(mergedRegionTmpDir, regionDir)) {
+        throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
+            + regionDir);
+      }
+    }
+  }
+
+  // 
===========================================================================
+  //  Create/Open/Delete Helpers
+  // 
===========================================================================
+  /**
+   * Log the current state of the region
+   * // TODO refactor to "logStorageState"
+   * @param LOG log to output information
+   * @throws IOException if an unexpected exception occurs
+   */
+  @Override
+  public void logFileSystemState(final Log LOG) throws IOException {
+    FSUtils.logFileSystemState(getFileSystem(), 
this.getRegionContainer().path, LOG);
+  }
+
+  /**
+   * @param hri
+   * @return Content of the file we write out to the filesystem under a region
+   * @throws IOException
+   */
+  private static byte[] getRegionInfoFileContent(final HRegionInfo hri) throws 
IOException {
+    return hri.toDelimitedByteArray();
+  }
+
+  /**
+   * Write the .regioninfo file on-disk.
+   */
+  private static void writeRegionInfoFileContent(final Configuration conf, 
final FileSystem fs,
+      final Path regionInfoFile, final byte[] content) throws IOException {
+    // First check to get the permissions
+    FsPermission perms = FSUtils.getFilePermissions(fs, conf, 
HConstants.DATA_FILE_UMASK_KEY);
+    // Write the RegionInfo file content
+    FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, 
null);
+    try {
+      out.write(content);
+    } finally {
+      out.close();
+    }
+  }
+
+  // ==========================================================================
+  //  PUBLIC bootstrap
+  // ==========================================================================
+  @Override
+  protected void bootstrap() throws IOException {
+    fsWithRetries.createDir(getRegionContainer().path);
+
+    // Cleanup temporary directories
+    cleanupTempContainer();
+    cleanupSplitsContainer();
+    cleanupMergesContainer();
+
+    // if it doesn't exists, Write HRI to a file, in case we need to recover 
hbase:meta
+    checkRegionInfoOnFilesystem();
+  }
+
+  public void checkRegionInfoOnFilesystem() throws IOException {
+    writeRegionInfoFileContent(getConfiguration(), getFileSystem(),
+      LegacyLayout.getRegionInfoFile(getRegionContainer().path),
+      getRegionInfoFileContent(getRegionInfo()));
+  }
+
+  @Override
+  protected void destroy() throws IOException {
+    fsWithRetries.deleteDir(regionDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
index 1ee308d..1546f58 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.fs.RegionStorage;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -195,14 +195,10 @@ public class CatalogJanitor extends ScheduledChore {
    */
   boolean cleanMergeRegion(final HRegionInfo mergedRegion,
       final HRegionInfo regionA, final HRegionInfo regionB) throws IOException 
{
-    FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
-    Path rootdir = this.services.getMasterFileSystem().getRootDir();
-    Path tabledir = FSUtils.getTableDir(rootdir, mergedRegion.getTable());
     HTableDescriptor htd = getTableDescriptor(mergedRegion.getTable());
-    RegionFileSystem regionFs = null;
+    RegionStorage regionFs = null;
     try {
-      regionFs = RegionFileSystem.open(
-          this.services.getConfiguration(), fs, tabledir, mergedRegion, false);
+      regionFs = RegionStorage.open(this.services.getConfiguration(), 
mergedRegion, false);
     } catch (IOException e) {
       LOG.warn("Merged region does not exist: " + 
mergedRegion.getEncodedName());
     }
@@ -210,6 +206,8 @@ public class CatalogJanitor extends ScheduledChore {
       LOG.debug("Deleting region " + regionA.getRegionNameAsString() + " and "
           + regionB.getRegionNameAsString()
           + " from fs because merged region no longer holds references");
+      // TODO update HFileArchiver to use RegionStorage
+      FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
       HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, 
regionA);
       HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, 
regionB);
       MetaTableAccessor.deleteMergeQualifiers(services.getConnection(), 
mergedRegion);
@@ -406,8 +404,7 @@ public class CatalogJanitor extends ScheduledChore {
 
     boolean references = false;
     try {
-      final RegionFileSystem regionFs = 
RegionFileSystem.open(this.services.getConfiguration(),
-          fs, tabledir, daughter, false);
+      final RegionStorage regionFs = 
RegionStorage.open(this.services.getConfiguration(), daughter, false);
       final HTableDescriptor parentDescriptor = 
getTableDescriptor(parent.getTable());
 
       for (HColumnDescriptor family: parentDescriptor.getFamilies()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index f0a71b0..196cb16 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.fs.FsContext;
 import org.apache.hadoop.hbase.fs.MasterFileSystem;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -285,6 +284,7 @@ public class CreateTableProcedure
           final List<HRegionInfo> newRegions) throws IOException {
         HRegionInfo[] regions = newRegions != null ?
           newRegions.toArray(new HRegionInfo[newRegions.size()]) : null;
+        //TODO this should be RegionStorage
         return ModifyRegionUtils.createRegions(env.getMasterConfiguration(),
             tableRootDir, hTableDescriptor, regions, null);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ServerProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ServerProtobufUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ServerProtobufUtil.java
new file mode 100644
index 0000000..416ffee
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ServerProtobufUtil.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+// TODO remove unused imports
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLoadStats;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SnapshotDescription;
+import org.apache.hadoop.hbase.client.SnapshotType;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.security.SecurityCapability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
+import org.apache.hadoop.hbase.io.LimitInputStream;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.DeleteType;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
+import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
+import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
+import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ColumnFamilySchema;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.util.ByteStringer;
+
+/**
+ * Protobufs utility for server side only.
+ */
+@InterfaceAudience.Private
+public final class ServerProtobufUtil {
+
+  private ServerProtobufUtil() {
+  }
+
+  public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, 
byte[] family,
+      List<Path> inputPaths, List<Path> outputPaths, StorageIdentifier 
storeDir) {
+    return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, 
storeDir);
+  }
+
+  public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, 
byte[] regionName,
+      byte[] family, List<Path> inputPaths, List<Path> outputPaths, 
StorageIdentifier storeDir) {
+    // compaction descriptor contains relative paths.
+    // input / output paths are relative to the store dir
+    // store dir is relative to region dir
+    CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()
+        .setTableName(ByteStringer.wrap(info.getTable().toBytes()))
+        .setEncodedRegionName(ByteStringer.wrap(
+          regionName == null ? info.getEncodedNameAsBytes() : regionName))
+        .setFamilyName(ByteStringer.wrap(family))
+        // TODO need an equivalent to getName as unique name for 
StorageIdentifier
+        .setStoreHomeDir(((LegacyPathIdentifier)storeDir).path.getName()); 
//make relative
+    for (Path inputPath : inputPaths) {
+      builder.addCompactionInput(inputPath.getName()); //relative path
+    }
+    for (Path outputPath : outputPaths) {
+      builder.addCompactionOutput(outputPath.getName());
+    }
+    builder.setRegionName(ByteStringer.wrap(info.getRegionName()));
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
index 73ed9e9..f071f27 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -52,7 +52,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.fs.RegionStorage;
+import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
 import org.apache.hadoop.hbase.fs.legacy.LegacyTableDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -103,6 +104,8 @@ public class CompactionTool extends Configured implements 
Tool {
     /**
      * Execute the compaction on the specified path.
      *
+     * TODO either retool in terms of region info or remove outright
+     *
      * @param path Directory path on which to run compaction.
      * @param compactOnce Execute just a single step of compaction.
      * @param major Request major compaction.
@@ -112,8 +115,8 @@ public class CompactionTool extends Configured implements 
Tool {
         Path regionDir = path.getParent();
         Path tableDir = regionDir.getParent();
         HTableDescriptor htd = 
LegacyTableDescriptor.getTableDescriptorFromFs(fs, tableDir);
-        HRegionInfo hri = RegionFileSystem.loadRegionInfoFileContent(fs, 
regionDir);
-        compactStoreFiles(tableDir, htd, hri,
+        final RegionStorage rs = RegionStorage.open(conf, new 
LegacyPathIdentifier(regionDir), false);
+        compactStoreFiles(tableDir, htd, rs.getRegionInfo(),
             path.getName(), compactOnce, major);
       } else if (isRegionDir(fs, path)) {
         Path tableDir = path.getParent();
@@ -138,9 +141,10 @@ public class CompactionTool extends Configured implements 
Tool {
     private void compactRegion(final Path tableDir, final HTableDescriptor htd,
         final Path regionDir, final boolean compactOnce, final boolean major)
         throws IOException {
-      HRegionInfo hri = RegionFileSystem.loadRegionInfoFileContent(fs, 
regionDir);
+      final RegionStorage rs = RegionStorage.open(conf, new 
LegacyPathIdentifier(regionDir), false);
+      // todo use RegionStorage to iterate instead of FSUtils
       for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
-        compactStoreFiles(tableDir, htd, hri, familyDir.getName(), 
compactOnce, major);
+        compactStoreFiles(tableDir, htd, rs.getRegionInfo(), 
familyDir.getName(), compactOnce, major);
       }
     }
 
@@ -181,7 +185,7 @@ public class CompactionTool extends Configured implements 
Tool {
     private static HStore getStore(final Configuration conf, final FileSystem 
fs,
         final Path tableDir, final HTableDescriptor htd, final HRegionInfo hri,
         final String familyName, final Path tempDir) throws IOException {
-      RegionFileSystem regionFs = null;
+      RegionStorage regionFs = null;
       HRegion region = new HRegion(regionFs, htd, null, null);
       return new HStore(region, htd.getFamily(Bytes.toBytes(familyName)), 
conf);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c84e896..e8cae9a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -133,7 +133,9 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterWrapper;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
-import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.fs.RegionStorage;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -288,7 +290,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
 
   private final WAL wal;
-  private final RegionFileSystem fs;
+  private final RegionStorage fs;
   protected final Configuration conf;
   private final Configuration baseConf;
   private final int rowLockWaitDuration;
@@ -621,7 +623,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       final Configuration confParam, final HRegionInfo regionInfo,
       final HTableDescriptor htd, final RegionServerServices rsServices)
       throws IOException {
-    this(RegionFileSystem.open(confParam, fs, tableDir, regionInfo, false), 
htd, wal, rsServices);
+    this(RegionStorage.open(confParam, fs, new LegacyPathIdentifier(tableDir), 
regionInfo, false), htd, wal, rsServices);
   }
 
   /**
@@ -640,12 +642,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @param htd the table descriptor
    * @param rsServices reference to {@link RegionServerServices} or null
    */
-  public HRegion(final RegionFileSystem rfs, final HTableDescriptor htd, final 
WAL wal,
+  public HRegion(final RegionStorage rfs, final HTableDescriptor htd, final 
WAL wal,
       final RegionServerServices rsServices) {
     this(rfs, wal, rfs.getConfiguration(), htd, rsServices);
   }
 
-  private HRegion(final RegionFileSystem fs, final WAL wal, final 
Configuration confParam,
+  private HRegion(final RegionStorage fs, final WAL wal, final Configuration 
confParam,
       final HTableDescriptor htd, final RegionServerServices rsServices) {
     if (htd == null) {
       throw new IllegalArgumentException("Need table descriptor");
@@ -832,11 +834,22 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     long maxSeqId = initializeStores(reporter, status);
     this.mvcc.advanceTo(maxSeqId);
     if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
-      // Recover any edits if available.
-      maxSeqId = Math.max(maxSeqId,
-        replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, 
reporter, status));
-      // Make sure mvcc is up to max.
-      this.mvcc.advanceTo(maxSeqId);
+      final StorageIdentifier regionContainer = this.fs.getRegionContainer();
+      /* 
+       * TODO either move wal replay stuff to not rely on details from 
RegionStorage,
+       *      implement a WALStorage abstraction, or make a "Recovered Edits 
visitor".
+       */
+      if (regionContainer instanceof LegacyPathIdentifier) {
+        // Recover any edits if available.
+        maxSeqId = Math.max(maxSeqId,
+            
replayRecoveredEditsIfAny(((LegacyPathIdentifier)this.fs.getRegionContainer()).path,
 maxSeqIdInStores,
+            reporter, status));
+        // Make sure mvcc is up to max.
+        this.mvcc.advanceTo(maxSeqId);
+      } else {
+        LOG.debug("Skipping check for recovered edits, because RegionStorage 
implementation '" + this.fs.getClass() +
+            "' doesn't return Paths for the region container.");
+      }
     }
     this.lastReplayedOpenRegionSeqId = maxSeqId;
 
@@ -847,14 +860,14 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     if (this.writestate.writesEnabled) {
       // Remove temporary data left over from old regions
       status.setStatus("Cleaning up temporary data from old regions");
-      fs.cleanupTempDir();
+      fs.cleanupTempContainer();
 
       status.setStatus("Cleaning up detritus from prior splits");
       // Get rid of any splits or merges that were lost in-progress.  Clean out
       // these directories here on open.  We may be opening a region that was
       // being split but we crashed in the middle of it all.
       fs.cleanupAnySplitDetritus();
-      fs.cleanupMergesDir();
+      fs.cleanupMergesContainer();
     }
 
     // Initialize split policy
@@ -876,8 +889,19 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     // is opened before recovery completes. So we add a safety bumper to avoid 
new sequence number
     // overlaps used sequence numbers
     if (this.writestate.writesEnabled) {
-      nextSeqid = 
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
-          .getRegionDir(), nextSeqid, (this.recovering ? (this.flushPerChanges 
+ 10000000) : 1));
+      final StorageIdentifier regionContainer = this.fs.getRegionContainer();
+      /* 
+       * TODO more WAL replay stuff that needs to get pulled out of the notion 
of region storage
+       */
+      if (regionContainer instanceof LegacyPathIdentifier) {
+        nextSeqid = 
WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(),
+            ((LegacyPathIdentifier)regionContainer).path, nextSeqid,
+            (this.recovering ? (this.flushPerChanges + 10000000) : 1));
+      } else {
+        LOG.debug("Skipping region sequence id checkpointing, because 
RegionStorage implementation '" +
+            this.fs.getClass() + "' doesn't return Paths for the region 
container.");
+        nextSeqid++;
+      }
     } else {
       nextSeqid++;
     }
@@ -1027,12 +1051,21 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), 
getRegionInfo(), regionEventDesc,
         mvcc);
 
-    // Store SeqId in HDFS when a region closes
-    // checking region folder exists is due to many tests which delete the 
table folder while a
-    // table is still online
-    if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
-      WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), 
this.fs.getRegionDir(),
-        mvcc.getReadPoint(), 0);
+    final StorageIdentifier regionContainer = this.fs.getRegionContainer();
+    /*
+     * TODO more WAL stuff to move out of region storage
+     */
+    if (regionContainer instanceof LegacyPathIdentifier) {
+      // Store SeqId in HDFS when a region closes
+      // checking region folder exists is due to many tests which delete the 
table folder while a
+      // table is still online
+      if 
(this.fs.getFileSystem().exists(((LegacyPathIdentifier)regionContainer).path)) {
+        WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), 
((LegacyPathIdentifier)regionContainer).path,
+          mvcc.getReadPoint(), 0);
+      }
+    } else {
+      LOG.debug("skipping WAL sequence ID checkpointing because the 
RegionStorage implementation, '" +
+          this.fs.getClass() + "' doesn't return Paths for the region 
container.");
     }
   }
 
@@ -1054,6 +1087,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     this.updatesLock.writeLock().unlock();
   }
 
+  /* TODO move these into RegionStorage, make it something like host locality 
so it generalizes */
+
   @Override
   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
     HDFSBlocksDistribution hdfsBlocksDistribution =
@@ -1101,7 +1136,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     HDFSBlocksDistribution hdfsBlocksDistribution = new 
HDFSBlocksDistribution();
     FileSystem fs = tablePath.getFileSystem(conf);
 
-    RegionFileSystem regionFs = RegionFileSystem.open(conf, fs, tablePath, 
regionInfo, false);
+    RegionStorage regionFs = RegionStorage.open(conf, regionInfo, false);
     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
       Collection<StoreFileInfo> storeFiles = 
regionFs.getStoreFiles(family.getNameAsString());
       if (storeFiles == null) continue;
@@ -1116,6 +1151,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return hdfsBlocksDistribution;
   }
 
+  /* /end TODO move these into RegionStorage, make it something like host 
locality so it generalizes */
+
   /**
    * Increase the size of mem store in this region and the size of global mem
    * store
@@ -1707,8 +1744,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return fs.getFileSystem();
   }
 
-  /** @return the {@link HRegionFileSystem} used by this region */
-  public RegionFileSystem getRegionFileSystem() {
+  /** @return the {@link HRegionStorage} used by this region */
+  public RegionStorage getRegionStorage() {
     return this.fs;
   }
 
@@ -4045,10 +4082,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       String fakeFamilyName = 
WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
       Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
       for (Path file: files) {
-        fakeStoreFiles.add(new 
StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
+        fakeStoreFiles.add(new StoreFile(getRegionStorage().getFileSystem(), 
file, this.conf,
           null, null));
       }
-      getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
+      getRegionStorage().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
     } else {
       for (Path file: files) {
         if (!fs.delete(file, false)) {
@@ -5509,23 +5546,22 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           if (bulkLoadListener != null) {
             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
           }
-          Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
+          final StoreFile commitedStoreFile = store.bulkLoadHFile(finalPath, 
seqId);
+          final String name = 
commitedStoreFile.getFileInfo().getPath().getName();
 
           // Note the size of the store file
           try {
-            FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
-            storeFilesSizes.put(commitedStoreFile.getName(), 
fs.getFileStatus(commitedStoreFile)
-                .getLen());
+            storeFilesSizes.put(name, 
this.fs.getStoreFileLen(commitedStoreFile));
           } catch (IOException e) {
             LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
-            storeFilesSizes.put(commitedStoreFile.getName(), 0L);
+            storeFilesSizes.put(name, 0L);
           }
 
           if(storeFiles.containsKey(familyName)) {
-            storeFiles.get(familyName).add(commitedStoreFile);
+            
storeFiles.get(familyName).add(commitedStoreFile.getFileInfo().getPath());
           } else {
             List<Path> storeFileNames = new ArrayList<Path>();
-            storeFileNames.add(commitedStoreFile);
+            storeFileNames.add(commitedStoreFile.getFileInfo().getPath());
             storeFiles.put(familyName, storeFileNames);
           }
           if (bulkLoadListener != null) {
@@ -6294,14 +6330,14 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @param htd the table descriptor
    * @return the new instance
    */
-  static HRegion newHRegion(Configuration conf, FileSystem fs, Path rootDir,
+  static HRegion newHRegion(Configuration conf,
       HTableDescriptor htd, HRegionInfo regionInfo, WAL wal, 
RegionServerServices rsServices)
       throws IOException {
-    RegionFileSystem rfs = RegionFileSystem.open(conf, fs, rootDir, 
regionInfo, false);
+    RegionStorage rfs = RegionStorage.open(conf, regionInfo, false);
     return newHRegion(rfs, htd, wal, rsServices);
   }
 
-  private static HRegion newHRegion(RegionFileSystem rfs, HTableDescriptor 
htd, WAL wal,
+  private static HRegion newHRegion(RegionStorage rfs, HTableDescriptor htd, 
WAL wal,
       RegionServerServices rsServices) throws IOException {
     try {
       Configuration conf = rfs.getConfiguration();
@@ -6311,7 +6347,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, 
HRegion.class);
 
       Constructor<? extends HRegion> c =
-          regionClass.getConstructor(RegionFileSystem.class, 
HTableDescriptor.class,
+          regionClass.getConstructor(RegionStorage.class, 
HTableDescriptor.class,
               WAL.class, RegionServerServices.class);
 
       return c.newInstance(rfs, htd, wal, rsServices);
@@ -6331,25 +6367,34 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @return new HRegion
    * @throws IOException
    */
-  public static HRegion createHRegion(final Configuration conf, final Path 
rootDir,
+  public static HRegion createHRegion(final Configuration conf,
         final HTableDescriptor hTableDescriptor, final HRegionInfo info,
         final WAL wal, final boolean initialize) throws IOException {
     LOG.info("creating HRegion " + info.getTable().getNameAsString()
-        + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
+        + " HTD == " + hTableDescriptor +
         " Table name == " + info.getTable().getNameAsString());
-    FileSystem fs = rootDir.getFileSystem(conf);
-    RegionFileSystem rfs = RegionFileSystem.open(conf, fs, rootDir, info, 
true);
+    RegionStorage rfs = RegionStorage.open(conf, info, true);
     HRegion region = HRegion.newHRegion(rfs, hTableDescriptor, wal, null);
     if (initialize) region.initialize(null);
     return region;
   }
 
-  public static HRegion createHRegion(final Configuration conf, final Path 
rootDir,
+  /**
+   * Create an HRegion and initialize it.
+   */
+  public static HRegion createHRegion(final Configuration conf,
       final HTableDescriptor hTableDescriptor, final HRegionInfo info, final 
WAL wal)
       throws IOException {
-    return createHRegion(conf, rootDir, hTableDescriptor, info, wal, true);
+    return createHRegion(conf, hTableDescriptor, info, wal, true);
   }
 
+  /**
+   * TODO remove after refactoring ModifyRegionUtils to use a RegionStorage 
impl instead of specifying a different root dir manually.
+   */
+  public static HRegion createHRegion(final Configuration conf, final Path 
rootDir, final HTableDescriptor htd, final HRegionInfo info) throws IOException 
{
+    RegionStorage rfs = RegionStorage.open(conf, rootDir.getFileSystem(conf), 
new LegacyPathIdentifier(rootDir), info, true);
+    return HRegion.newHRegion(rfs, htd, null, null);
+  }
 
   /**
    * Open a Region.
@@ -6371,7 +6416,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   /**
    * Open a Region.
-   * @param info Info for region to be opened
+   * @param info Info for region to be opened, must not be null
    * @param htd the table descriptor
    * @param wal WAL for region to use. This method will call
    * WAL#setSequenceNumber(long) passing the result of the call to
@@ -6383,110 +6428,32 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @return new HRegion
    *
    * @throws IOException
+   * @throws IllegalArgumentException if info is null
    */
   public static HRegion openHRegion(final HRegionInfo info,
     final HTableDescriptor htd, final WAL wal, final Configuration conf,
     final RegionServerServices rsServices,
     final CancelableProgressable reporter)
   throws IOException {
-    return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, 
rsServices, reporter);
-  }
-
-  /**
-   * Open a Region.
-   * @param rootDir Root directory for HBase instance
-   * @param info Info for region to be opened.
-   * @param htd the table descriptor
-   * @param wal WAL for region to use. This method will call
-   * WAL#setSequenceNumber(long) passing the result of the call to
-   * HRegion#getMinSequenceId() to ensure the wal id is properly kept
-   * up.  HRegionStore does this every time it opens a new region.
-   * @param conf The Configuration object to use.
-   * @return new HRegion
-   * @throws IOException
-   */
-  public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
-      final HTableDescriptor htd, final WAL wal, final Configuration conf)
-  throws IOException {
-    return openHRegion(rootDir, info, htd, wal, conf, null, null);
-  }
-
-  /**
-   * Open a Region.
-   * @param rootDir Root directory for HBase instance
-   * @param info Info for region to be opened.
-   * @param htd the table descriptor
-   * @param wal WAL for region to use. This method will call
-   * WAL#setSequenceNumber(long) passing the result of the call to
-   * HRegion#getMinSequenceId() to ensure the wal id is properly kept
-   * up.  HRegionStore does this every time it opens a new region.
-   * @param conf The Configuration object to use.
-   * @param rsServices An interface we can request flushes against.
-   * @param reporter An interface we can report progress against.
-   * @return new HRegion
-   * @throws IOException
-   */
-  public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
-      final HTableDescriptor htd, final WAL wal, final Configuration conf,
-      final RegionServerServices rsServices,
-      final CancelableProgressable reporter)
-  throws IOException {
-    FileSystem fs = null;
-    if (rsServices != null) {
-      fs = rsServices.getFileSystem();
-    }
-    if (fs == null) {
-      fs = FileSystem.get(conf);
+    if (info == null) throw new IllegalArgumentException("Passed region info 
is null");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Opening region: " + info);
     }
-    return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, 
reporter);
-  }
-
-  /**
-   * Open a Region.
-   * @param conf The Configuration object to use.
-   * @param fs Filesystem to use
-   * @param rootDir Root directory for HBase instance
-   * @param info Info for region to be opened.
-   * @param htd the table descriptor
-   * @param wal WAL for region to use. This method will call
-   * WAL#setSequenceNumber(long) passing the result of the call to
-   * HRegion#getMinSequenceId() to ensure the wal id is properly kept
-   * up.  HRegionStore does this every time it opens a new region.
-   * @return new HRegion
-   * @throws IOException
-   */
-  public static HRegion openHRegion(final Configuration conf, final FileSystem 
fs,
-      final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, 
final WAL wal)
-      throws IOException {
-    return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
+    HRegion r = HRegion.newHRegion(conf, htd, info, wal, rsServices);
+    return r.openHRegion(reporter);
   }
 
   /**
-   * Open a Region.
-   * @param conf The Configuration object to use.
-   * @param fs Filesystem to use
-   * @param rootDir Root directory for HBase instance
-   * @param info Info for region to be opened.
-   * @param htd the table descriptor
-   * @param wal WAL for region to use. This method will call
-   * WAL#setSequenceNumber(long) passing the result of the call to
-   * HRegion#getMinSequenceId() to ensure the wal id is properly kept
-   * up.  HRegionStore does this every time it opens a new region.
-   * @param rsServices An interface we can request flushes against.
-   * @param reporter An interface we can report progress against.
-   * @return new HRegion
-   * @throws IOException
+   * TODO remove after refactoring TableSnapshotScanner and 
TableSnapshotInputFormatImpl to use a RegionStorage impl instead of specifying 
a different root dir manually.
    */
-  public static HRegion openHRegion(final Configuration conf, final FileSystem 
fs,
-      final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, 
final WAL wal,
-      final RegionServerServices rsServices, final CancelableProgressable 
reporter)
-      throws IOException {
-    if (info == null) throw new NullPointerException("Passed region info is 
null");
+  public static HRegion openHRegion(final FileSystem fs, final Path rootDir, 
final HRegionInfo info, HTableDescriptor htd, Configuration conf) throws 
IOException {
+    if (info == null) throw new IllegalArgumentException("Passed region info 
is null");
     if (LOG.isDebugEnabled()) {
       LOG.debug("Opening region: " + info);
     }
-    HRegion r = HRegion.newHRegion(conf, fs, rootDir, htd, info, wal, 
rsServices);
-    return r.openHRegion(reporter);
+    RegionStorage rfs = RegionStorage.open(conf, fs, new 
LegacyPathIdentifier(rootDir), info, false);
+    HRegion r = newHRegion(rfs, htd, null, null);
+    return r.openHRegion(null);
   }
 
   @VisibleForTesting
@@ -6503,10 +6470,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    */
   public static HRegion openHRegion(final HRegion other, final 
CancelableProgressable reporter)
       throws IOException {
-    RegionFileSystem regionFs = other.getRegionFileSystem();
-    HRegion r = newHRegion(other.baseConf, regionFs.getFileSystem(),
-      regionFs.getRootDir(), other.getTableDesc(), other.getRegionInfo(),
-      other.getWAL(), null);
+    HRegion r = newHRegion(other.getRegionStorage(), other.htableDescriptor, 
other.getWAL(), null);
     return r.openHRegion(reporter);
   }
 
@@ -6554,17 +6518,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       LOG.debug("HRegion.Warming up region: " + info);
     }
 
-    Path rootDir = FSUtils.getRootDir(conf);
-
-    FileSystem fs = null;
-    if (rsServices != null) {
-      fs = rsServices.getFileSystem();
-    }
-    if (fs == null) {
-      fs = FileSystem.get(conf);
-    }
-
-    HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
+    HRegion r = HRegion.newHRegion(conf, htd, info, wal, null);
     r.initializeWarmup(reporter);
   }
 
@@ -6597,7 +6551,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     fs.commitDaughterRegion(hri);
 
     // Create the daughter HRegion instance
-    HRegion r = HRegion.newHRegion(this.getBaseConf(), fs.getFileSystem(), 
this.fs.getRootDir(),
+    HRegion r = HRegion.newHRegion(this.getBaseConf(),
         this.getTableDesc(), hri, this.getWAL(), rsServices);
     r.readRequestsCount.add(this.getReadRequestsCount() / 2);
     r.filteredReadRequestsCount.add(this.getFilteredReadRequestsCount() / 2);
@@ -6613,7 +6567,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    */
   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
       final HRegion region_b) throws IOException {
-    HRegion r = HRegion.newHRegion(this.getBaseConf(), fs.getFileSystem(), 
this.fs.getRootDir(),
+    HRegion r = HRegion.newHRegion(this.getBaseConf(),
         this.getTableDesc(), mergedRegionInfo, this.getWAL(), this.rsServices);
     r.readRequestsCount.add(this.getReadRequestsCount()
         + region_b.getReadRequestsCount());
@@ -6750,7 +6704,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       throw new IOException("Regions do not belong to the same table");
     }
 
-    FileSystem fs = a.getRegionFileSystem().getFileSystem();
+    FileSystem fs = a.getRegionStorage().getFileSystem();
     // Make sure each region's cache is empty
     a.flush(true);
     b.flush(true);
@@ -6759,12 +6713,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     a.compact(true);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for region: " + a);
-      a.getRegionFileSystem().logFileSystemState(LOG);
+      a.getRegionStorage().logFileSystemState(LOG);
     }
     b.compact(true);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for region: " + b);
-      b.getRegionFileSystem().logFileSystemState(LOG);
+      b.getRegionStorage().logFileSystemState(LOG);
     }
 
     RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, 
true);
@@ -6790,14 +6744,14 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for new region");
-      dstRegion.getRegionFileSystem().logFileSystemState(LOG);
+      dstRegion.getRegionStorage().logFileSystemState(LOG);
     }
 
     // clear the compacted files if any
     for (Store s : dstRegion.getStores()) {
       s.closeAndArchiveCompactedFiles();
     }
-    if 
(dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
+    if (dstRegion.getRegionStorage().hasReferences(dstRegion.getTableDesc())) {
       throw new IOException("Merged region " + dstRegion
           + " still has references after the compaction, is compaction 
canceled?");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index e3b1a85..ba66ded 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -68,7 +68,9 @@ import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
-import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.fs.RegionStorage;
+import org.apache.hadoop.hbase.fs.StorageIdentifier;
+import org.apache.hadoop.hbase.fs.legacy.LegacyPathIdentifier;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -80,7 +82,7 @@ import 
org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ServerProtobufUtil;
 import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
@@ -128,7 +130,7 @@ public class HStore implements Store {
   // This stores directory in the filesystem.
   protected final HRegion region;
   private final HColumnDescriptor family;
-  private final RegionFileSystem fs;
+  private final RegionStorage fs;
   protected Configuration conf;
   protected CacheConfig cacheConf;
   private long lastCompactSize = 0;
@@ -201,10 +203,10 @@ public class HStore implements Store {
   protected HStore(final HRegion region, final HColumnDescriptor family,
       final Configuration confParam) throws IOException {
 
-    this.fs = region.getRegionFileSystem();
+    this.fs = region.getRegionStorage();
 
     // Assemble the store's home directory and Ensure it exists.
-    fs.createStoreDir(family.getNameAsString());
+    fs.createStoreContainer(family.getNameAsString());
     this.region = region;
     this.family = family;
     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
@@ -336,7 +338,7 @@ public class HStore implements Store {
     return this.fs.getFileSystem();
   }
 
-  public RegionFileSystem getRegionFileSystem() {
+  public RegionStorage getRegionStorage() {
     return this.fs;
   }
 
@@ -734,21 +736,21 @@ public class HStore implements Store {
     }
   }
 
+  /* TODO bulkload rephrased in terms of transfer between RegionStorage impls. 
*/
   @Override
-  public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException 
{
-    Path srcPath = new Path(srcPathStr);
-    Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, 
seqNum);
+  public StoreFile bulkLoadHFile(String srcPathStr, long seqNum) throws 
IOException {
+    LegacyPathIdentifier srcPath = new LegacyPathIdentifier(new 
Path(srcPathStr));
+    StoreFile sf = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, 
seqNum, cacheConf, family.getBloomFilterType(), 
this.region.getCoprocessorHost());
 
     LOG.info("Loaded HFile " + srcPath + " into store '" + 
getColumnFamilyName() + "' as "
-        + dstPath + " - updating store file list.");
+        + sf + " - updating store file list.");
 
-    StoreFile sf = createStoreFileAndReader(dstPath);
     bulkLoadHFile(sf);
 
     LOG.info("Successfully loaded store file " + srcPath + " into store " + 
this
-        + " (new location: " + dstPath + ")");
+        + " (new location: " + sf + ")");
 
-    return dstPath;
+    return sf;
   }
 
   @Override
@@ -921,10 +923,11 @@ public class HStore implements Store {
   private StoreFile commitFile(final Path path, final long logCacheFlushId, 
MonitoredTask status)
       throws IOException {
     // Write-out finished successfully, move into the right spot
-    Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
+    // TODO move from Path to StoreFileWriter or StorageIdentifier that goes 
ends-to-end 
+    final StorageIdentifier id = new LegacyPathIdentifier(path);
+    final StoreFile sf = fs.commitStoreFile(getColumnFamilyName(), id, 
this.cacheConf, this.family.getBloomFilterType(), 
this.region.getCoprocessorHost());
 
     status.setStatus("Flushing " + this + ": reopening flushed file");
-    StoreFile sf = createStoreFileAndReader(dstPath);
 
     StoreFileReader r = sf.getReader();
     this.storeSize += r.length();
@@ -992,9 +995,10 @@ public class HStore implements Store {
     }
     HFileContext hFileContext = createFileContext(compression, 
includeMVCCReadpoint, includesTag,
       cryptoContext);
+    // TODO move StoreFileWriter to use RegionStorage directly instead of Path
     StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, 
writerCacheConf,
         this.getFileSystem())
-            .withFilePath(fs.createTempName())
+            .withFilePath(((LegacyPathIdentifier)fs.getTempIdentifier()).path)
             .withComparator(comparator)
             .withBloomType(family.getBloomFilterType())
             .withMaxKeyCount(maxKeyCount)
@@ -1223,7 +1227,7 @@ public class HStore implements Store {
       // Ready to go. Have list of files to compact.
       LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) 
in "
           + this + " of " + this.getRegionInfo().getRegionNameAsString()
-          + " into tmpdir=" + fs.getTempDir() + ", totalSize="
+          + " into tmpdir=" + fs.getTempContainer() + ", totalSize="
           + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
 
       // Commence the compaction.
@@ -1298,8 +1302,10 @@ public class HStore implements Store {
   StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
     validateStoreFile(newFile);
     // Move the file into the right spot
-    Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
-    return createStoreFileAndReader(destPath);
+    // TODO work backwards to end up with a StorageIdentifier instead of Path 
as arg.
+    final StorageIdentifier id = new LegacyPathIdentifier(newFile);
+    final StoreFile sf = fs.commitStoreFile(getColumnFamilyName(), id, 
this.cacheConf, this.family.getBloomFilterType(), 
this.region.getCoprocessorHost());
+    return sf;
   }
 
   /**
@@ -1319,8 +1325,8 @@ public class HStore implements Store {
       outputPaths.add(f.getPath());
     }
     HRegionInfo info = this.region.getRegionInfo();
-    CompactionDescriptor compactionDescriptor = 
ProtobufUtil.toCompactionDescriptor(info,
-        family.getName(), inputPaths, outputPaths, 
fs.getStoreDir(getFamily().getNameAsString()));
+    CompactionDescriptor compactionDescriptor = 
ServerProtobufUtil.toCompactionDescriptor(info,
+        family.getName(), inputPaths, outputPaths, 
fs.getStoreContainer(getFamily().getNameAsString()));
     // Fix reaching into Region to get the maxWaitForSeqId.
     // Does this method belong in Region altogether given it is making so many 
references up there?
     // Could be Region#writeCompactionMarker(compactionDescriptor);
@@ -1382,6 +1388,7 @@ public class HStore implements Store {
     }
   }
 
+  // TODO move this into RegionStorage
   /**
    * Call to complete a compaction. Its for the case where we find in the WAL 
a compaction
    * that was not finished.  We could find one recovering a WAL after a 
regionserver crash.
@@ -1413,7 +1420,7 @@ public class HStore implements Store {
     String familyName = this.getColumnFamilyName();
     List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
     for (String compactionInput : compactionInputs) {
-      Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
+      Path inputPath = 
((LegacyPathIdentifier)fs.getStoreFileStorageIdentifier(familyName, 
compactionInput)).path;
       inputFiles.add(inputPath.getName());
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/2986c971/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index efd68b8..d994130 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -220,6 +220,7 @@ public interface Region extends ConfigurationObserver {
   /** @return the MetricsRegion for this region */
   MetricsRegion getMetrics();
 
+  /* TODO make this something like host locality so it generalizes */
   /** @return the block distribution for all Stores managed by this region */
   HDFSBlocksDistribution getHDFSBlocksDistribution();
 

Reply via email to