http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java
new file mode 100644
index 0000000..ddcda4c
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * Define a set of APIs that may vary in different environments
+ */
+public interface MetaStoreFS {
+
+  /**
+   * delete a directory
+   *
+   * @param f
+   * @param ifPurge
+   * @param recursive
+   * @return true on success
+   * @throws MetaException
+   */
+  public boolean deleteDir(FileSystem fs, Path f, boolean recursive,
+      boolean ifPurge, Configuration conf) throws MetaException;
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java
new file mode 100644
index 0000000..26e2c49
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface MetadataStore {
+  /**
+   * @param fileIds file ID list.
+   * @param result The ref parameter, used to return the serialized file 
metadata.
+   */
+  void getFileMetadata(List<Long> fileIds, ByteBuffer[] result) throws 
IOException;
+
+  /**
+   * @param fileIds file ID list.
+   * @param metadataBuffers Serialized file metadata, one per file ID.
+   * @param addedCols The column names for additional columns created by 
file-format-specific
+   *                  metadata handler, to be stored in the cache.
+   * @param addedVals The values for addedCols; one value per file ID per 
added column.
+   */
+  void storeFileMetadata(List<Long> fileIds, List<ByteBuffer> metadataBuffers,
+      ByteBuffer[] addedCols, ByteBuffer[][] addedVals) throws IOException, 
InterruptedException;
+
+  /**
+   * @param fileId The file ID.
+   * @param metadata Serialized file metadata.
+   * @param addedCols The column names for additional columns created by 
file-format-specific
+   *                  metadata handler, to be stored in the cache.
+   * @param addedVals The values for addedCols; one value per added column.
+   */
+  void storeFileMetadata(long fileId, ByteBuffer metadata, ByteBuffer[] 
addedCols,
+      ByteBuffer[] addedVals) throws IOException, InterruptedException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
new file mode 100644
index 0000000..e5d21b0
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Any task that will run as a separate thread in the metastore should 
implement this
+ * interface.
+ */
+public interface MetastoreTaskThread extends Configurable, Runnable {
+
+  /**
+   * Get the frequency at which the thread should be scheduled in the thread 
pool.  You must call
+   * {@link #setConf(Configuration)} before calling this method.
+   * @param unit TimeUnit to express the frequency in.
+   * @return frequency
+   */
+  long runFrequency(TimeUnit unit);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java
new file mode 100644
index 0000000..105511d
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+
+/**
+ * The proxy interface that metastore uses for variety of QL operations 
(metastore can't depend
+ * on QL because QL depends on metastore; creating metastore-client module 
would be a proper way
+ * to solve this problem).
+ */
+public interface PartitionExpressionProxy {
+
+  /**
+   * Converts serialized Hive expression into filter in the format suitable 
for Filter.g.
+   * @param expr Serialized expression.
+   * @return The filter string.
+   */
+  public String convertExprToFilter(byte[] expr) throws MetaException;
+
+  /**
+   * Filters the partition names via serialized Hive expression.
+   * @param partColumns Partition columns in the underlying table.
+   * @param expr Serialized expression.
+   * @param defaultPartitionName Default partition name from job or server 
configuration.
+   * @param partitionNames Partition names; the list is modified in place.
+   * @return Whether there were any unknown partitions preserved in the name 
list.
+   */
+  boolean filterPartitionsByExpr(List<FieldSchema> partColumns,
+      byte[] expr, String defaultPartitionName, List<String> partitionNames) 
throws MetaException;
+
+  /**
+   * Determines the file metadata type from input format of the source table 
or partition.
+   * @param inputFormat Input format name.
+   * @return The file metadata type.
+   */
+  FileMetadataExprType getMetadataType(String inputFormat);
+
+  /**
+   * Gets a separate proxy that can be used to call file-format-specific 
methods.
+   * @param type The file metadata type.
+   * @return The proxy.
+   */
+  FileFormatProxy getFileFormatProxy(FileMetadataExprType type);
+
+  /**
+   * Creates SARG from serialized representation.
+   * @param expr SARG, serialized as Kryo.
+   * @return SARG.
+   */
+  SearchArgument createSarg(byte[] expr);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
new file mode 100644
index 0000000..893c9f4
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplChangeManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplChangeManager.class);
+  static private ReplChangeManager instance;
+
+  private static boolean inited = false;
+  private static boolean enabled = false;
+  private static Path cmroot;
+  private static Configuration conf;
+  private String msUser;
+  private String msGroup;
+
+  private static final String ORIG_LOC_TAG = "user.original-loc";
+  static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
+  private static final String URI_FRAGMENT_SEPARATOR = "#";
+  public static final String SOURCE_OF_REPLICATION = "repl.source.for";
+  private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]";
+
+  public enum RecycleType {
+    MOVE,
+    COPY
+  }
+
+  public static class FileInfo {
+    private FileSystem srcFs;
+    private Path sourcePath;
+    private Path cmPath;
+    private String checkSum;
+    private boolean useSourcePath;
+    private String subDir;
+    private boolean copyDone;
+
+    public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) {
+      this(srcFs, sourcePath, null, null, true, subDir);
+    }
+    public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath,
+                    String checkSum, boolean useSourcePath, String subDir) {
+      this.srcFs = srcFs;
+      this.sourcePath = sourcePath;
+      this.cmPath = cmPath;
+      this.checkSum = checkSum;
+      this.useSourcePath = useSourcePath;
+      this.subDir = subDir;
+      this.copyDone = false;
+    }
+    public FileSystem getSrcFs() {
+      return srcFs;
+    }
+    public Path getSourcePath() {
+      return sourcePath;
+    }
+    public Path getCmPath() {
+      return cmPath;
+    }
+    public String getCheckSum() {
+      return checkSum;
+    }
+    public boolean isUseSourcePath() {
+      return useSourcePath;
+    }
+    public void setIsUseSourcePath(boolean useSourcePath) {
+      this.useSourcePath = useSourcePath;
+    }
+    public String getSubDir() {
+      return subDir;
+    }
+    public boolean isCopyDone() {
+      return copyDone;
+    }
+    public void setCopyDone(boolean copyDone) {
+      this.copyDone = copyDone;
+    }
+    public Path getEffectivePath() {
+      if (useSourcePath) {
+        return sourcePath;
+      } else {
+        return cmPath;
+      }
+    }
+  }
+
+  public static synchronized ReplChangeManager getInstance(Configuration conf)
+      throws MetaException {
+    if (instance == null) {
+      instance = new ReplChangeManager(conf);
+    }
+    return instance;
+  }
+
+  private ReplChangeManager(Configuration conf) throws MetaException {
+    try {
+      if (!inited) {
+        if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) {
+          ReplChangeManager.enabled = true;
+          ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, 
ConfVars.REPLCMDIR));
+          ReplChangeManager.conf = conf;
+
+          FileSystem cmFs = cmroot.getFileSystem(conf);
+          // Create cmroot with permission 700 if not exist
+          if (!cmFs.exists(cmroot)) {
+            cmFs.mkdirs(cmroot);
+            cmFs.setPermission(cmroot, new FsPermission("700"));
+          }
+          UserGroupInformation usergroupInfo = 
UserGroupInformation.getCurrentUser();
+          msUser = usergroupInfo.getShortUserName();
+          msGroup = usergroupInfo.getPrimaryGroupName();
+        }
+        inited = true;
+      }
+    } catch (IOException e) {
+      throw new MetaException(StringUtils.stringifyException(e));
+    }
+  }
+
+  // Filter files starts with ".". Note Hadoop consider files starts with
+  // "." or "_" as hidden file. However, we need to replicate files starts
+  // with "_". We find at least 2 use cases:
+  // 1. For har files, _index and _masterindex is required files
+  // 2. _success file is required for Oozie to indicate availability of data 
source
+  private static final PathFilter hiddenFileFilter = new PathFilter(){
+    public boolean accept(Path p){
+      return !p.getName().startsWith(".");
+    }
+  };
+
+  /***
+   * Move a path into cmroot. If the path is a directory (of a partition, or 
table if nonpartitioned),
+   *   recursively move files inside directory to cmroot. Note the table must 
be managed table
+   * @param path a single file or directory
+   * @param type if the files to be copied or moved to cmpath.
+   *             Copy is costly but preserve the source file
+   * @param ifPurge if the file should skip Trash when move/delete source file.
+   *                This is referred only if type is MOVE.
+   * @return int
+   * @throws IOException
+   */
+  public int recycle(Path path, RecycleType type, boolean ifPurge) throws 
IOException {
+    if (!enabled) {
+      return 0;
+    }
+
+    int count = 0;
+    FileSystem fs = path.getFileSystem(conf);
+    if (fs.isDirectory(path)) {
+      FileStatus[] files = fs.listStatus(path, hiddenFileFilter);
+      for (FileStatus file : files) {
+        count += recycle(file.getPath(), type, ifPurge);
+      }
+    } else {
+      String fileCheckSum = checksumFor(path, fs);
+      Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, 
cmroot.toString());
+
+      // set timestamp before moving to cmroot, so we can
+      // avoid race condition CM remove the file before setting
+      // timestamp
+      long now = System.currentTimeMillis();
+      fs.setTimes(path, now, -1);
+
+      boolean success = false;
+      if (fs.exists(cmPath) && 
fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) {
+        // If already a file with same checksum exists in cmPath, just ignore 
the copy/move
+        // Also, mark the operation is unsuccessful to notify that file with 
same name already
+        // exist which will ensure the timestamp of cmPath is updated to avoid 
clean-up by
+        // CM cleaner.
+        success = false;
+      } else {
+        switch (type) {
+        case MOVE: {
+          LOG.info("Moving {} to {}", path.toString(), cmPath.toString());
+
+          // Rename fails if the file with same name already exist.
+          success = fs.rename(path, cmPath);
+          break;
+        }
+        case COPY: {
+          LOG.info("Copying {} to {}", path.toString(), cmPath.toString());
+
+          // It is possible to have a file with same checksum in cmPath but 
the content is
+          // partially copied or corrupted. In this case, just overwrite the 
existing file with
+          // new one.
+          success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf);
+          break;
+        }
+        default:
+          // Operation fails as invalid input
+          break;
+        }
+      }
+
+      // Ignore if a file with same content already exist in cmroot
+      // We might want to setXAttr for the new location in the future
+      if (success) {
+        // set the file owner to hive (or the id metastore run as)
+        fs.setOwner(cmPath, msUser, msGroup);
+
+        // tag the original file name so we know where the file comes from
+        // Note we currently only track the last known trace as
+        // xattr has limited capacity. We shall revisit and store all original
+        // locations if orig-loc becomes important
+        try {
+          fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+        } catch (UnsupportedOperationException e) {
+          LOG.warn("Error setting xattr for {}", path.toString());
+        }
+
+        count++;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("A file with the same content of {} already exists, 
ignore", path.toString());
+        }
+        // Need to extend the tenancy if we saw a newer file with the same 
content
+        fs.setTimes(cmPath, now, -1);
+      }
+
+      // Tag if we want to remain in trash after deletion.
+      // If multiple files share the same content, then
+      // any file claim remain in trash would be granted
+      if ((type == RecycleType.MOVE) && !ifPurge) {
+        try {
+          fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 });
+        } catch (UnsupportedOperationException e) {
+          LOG.warn("Error setting xattr for {}", cmPath.toString());
+        }
+      }
+    }
+    return count;
+  }
+
+  // Get checksum of a file
+  static public String checksumFor(Path path, FileSystem fs) throws 
IOException {
+    // TODO: fs checksum only available on hdfs, need to
+    //       find a solution for other fs (eg, local fs, s3, etc)
+    String checksumString = null;
+    FileChecksum checksum = fs.getFileChecksum(path);
+    if (checksum != null) {
+      checksumString = StringUtils.byteToHexString(
+          checksum.getBytes(), 0, checksum.getLength());
+    }
+    return checksumString;
+  }
+
+  /***
+   * Convert a path of file inside a partition or table (if non-partitioned)
+   *   to a deterministic location of cmroot. So user can retrieve the file 
back
+   *   with the original location plus checksum.
+   * @param conf Hive configuration
+   * @param name original filename
+   * @param checkSum checksum of the file, can be retrieved by {@link 
#checksumFor(Path, FileSystem)}
+   * @param cmRootUri CM Root URI. (From remote source if REPL LOAD flow. From 
local config if recycle.)
+   * @return Path
+   */
+  static Path getCMPath(Configuration conf, String name, String checkSum, 
String cmRootUri) {
+    String newFileName = name + "_" + checkSum;
+    int maxLength = 
conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);
+
+    if (newFileName.length() > maxLength) {
+      newFileName = newFileName.substring(0, maxLength-1);
+    }
+
+    return new Path(cmRootUri, newFileName);
+  }
+
+  /***
+   * Get original file specified by src and chksumString. If the file exists 
and checksum
+   * matches, return the file; otherwise, use chksumString to retrieve it from 
cmroot
+   * @param src Original file location
+   * @param checksumString Checksum of the original file
+   * @param srcCMRootURI CM root URI of the source cluster
+   * @param subDir Sub directory to which the source file belongs to
+   * @param conf Hive configuration
+   * @return Corresponding FileInfo object
+   */
+  public static FileInfo getFileInfo(Path src, String checksumString, String 
srcCMRootURI, String subDir,
+                                     Configuration conf) throws MetaException {
+    try {
+      FileSystem srcFs = src.getFileSystem(conf);
+      if (checksumString == null) {
+        return new FileInfo(srcFs, src, subDir);
+      }
+
+      Path cmPath = getCMPath(conf, src.getName(), checksumString, 
srcCMRootURI);
+      if (!srcFs.exists(src)) {
+        return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
+      }
+
+      String currentChecksumString;
+      try {
+        currentChecksumString = checksumFor(src, srcFs);
+      } catch (IOException ex) {
+        // If the file is missing or getting modified, then refer CM path
+        return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
+      }
+      if ((currentChecksumString == null) || 
checksumString.equals(currentChecksumString)) {
+        return new FileInfo(srcFs, src, cmPath, checksumString, true, subDir);
+      } else {
+        return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
+      }
+    } catch (IOException e) {
+      throw new MetaException(StringUtils.stringifyException(e));
+    }
+  }
+
+  /***
+   * Concatenate filename, checksum, source cmroot uri and subdirectory with 
"#"
+   * @param fileUriStr Filename string
+   * @param fileChecksum Checksum string
+   * @param encodedSubDir sub directory path into which this file belongs to. 
Here encoded means,
+   *                      the multiple levels of subdirectories are 
concatenated with path separator "/"
+   * @return Concatenated Uri string
+   */
+  // TODO: this needs to be enhanced once change management based filesystem 
is implemented
+  // Currently using fileuri#checksum#cmrooturi#subdirs as the format
+  public static String encodeFileUri(String fileUriStr, String fileChecksum, 
String encodedSubDir)
+          throws IOException {
+    String encodedUri = fileUriStr;
+    if ((fileChecksum != null) && (cmroot != null)) {
+      encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum
+              + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf);
+    } else {
+      encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + 
URI_FRAGMENT_SEPARATOR;
+    }
+    encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + ((encodedSubDir != 
null) ? encodedSubDir : "");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Encoded URI: " + encodedUri);
+    }
+    return encodedUri;
+  }
+
+  /***
+   * Split uri with fragment into file uri, subdirs, checksum and source 
cmroot uri.
+   * Currently using fileuri#checksum#cmrooturi#subdirs as the format.
+   * @param fileURIStr uri with fragment
+   * @return array of file name, subdirs, checksum and source CM root URI
+   */
+  public static String[] decodeFileUri(String fileURIStr) {
+    String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR);
+    String[] result = new String[4];
+    result[0] = uriAndFragment[0];
+    if ((uriAndFragment.length > 1) && 
!StringUtils.isEmpty(uriAndFragment[1])) {
+      result[1] = uriAndFragment[1];
+    }
+    if ((uriAndFragment.length > 2)  && 
!StringUtils.isEmpty(uriAndFragment[2])) {
+      result[2] = uriAndFragment[2];
+    }
+    if ((uriAndFragment.length > 3)  && 
!StringUtils.isEmpty(uriAndFragment[3])) {
+      result[3] = uriAndFragment[3];
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reading Encoded URI: " + result[0] + ":: " + result[1] + ":: 
" + result[2] + ":: " + result[3]);
+    }
+    return result;
+  }
+
+  public static boolean isCMFileUri(Path fromPath) {
+    String[] result = decodeFileUri(fromPath.toString());
+    return result[1] != null;
+  }
+
+  /**
+   * Thread to clear old files of cmroot recursively
+   */
+  static class CMClearer implements Runnable {
+    private Path cmroot;
+    private long secRetain;
+    private Configuration conf;
+
+    CMClearer(String cmrootString, long secRetain, Configuration conf) {
+      this.cmroot = new Path(cmrootString);
+      this.secRetain = secRetain;
+      this.conf = conf;
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.info("CMClearer started");
+
+        long now = System.currentTimeMillis();
+        FileSystem fs = cmroot.getFileSystem(conf);
+        FileStatus[] files = fs.listStatus(cmroot);
+
+        for (FileStatus file : files) {
+          long modifiedTime = file.getModificationTime();
+          if (now - modifiedTime > secRetain*1000) {
+            try {
+              if 
(fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
+                boolean succ = Trash.moveToAppropriateTrash(fs, 
file.getPath(), conf);
+                if (succ) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Move " + file.toString() + " to trash");
+                  }
+                } else {
+                  LOG.warn("Fail to move " + file.toString() + " to trash");
+                }
+              } else {
+                boolean succ = fs.delete(file.getPath(), false);
+                if (succ) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Remove " + file.toString());
+                  }
+                } else {
+                  LOG.warn("Fail to remove " + file.toString());
+                }
+              }
+            } catch (UnsupportedOperationException e) {
+              LOG.warn("Error getting xattr for " + file.getPath().toString());
+            }
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Exception when clearing cmroot:" + 
StringUtils.stringifyException(e));
+      }
+    }
+  }
+
+  // Schedule CMClearer thread. Will be invoked by metastore
+  static void scheduleCMClearer(Configuration conf) {
+    if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) {
+      ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+          new BasicThreadFactory.Builder()
+          .namingPattern("cmclearer-%d")
+          .daemon(true)
+          .build());
+      executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, 
ConfVars.REPLCMDIR),
+          MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, 
TimeUnit.SECONDS), conf),
+          0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, 
TimeUnit.SECONDS), TimeUnit.SECONDS);
+    }
+  }
+
+  public static boolean isSourceOfReplication(Database db) {
+    assert (db != null);
+    String replPolicyIds = getReplPolicyIdString(db);
+    return  !StringUtils.isEmpty(replPolicyIds);
+  }
+
+  public static String getReplPolicyIdString(Database db) {
+    if (db != null) {
+      Map<String, String> m = db.getParameters();
+      if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) {
+        String replPolicyId = m.get(SOURCE_OF_REPLICATION);
+        LOG.debug("repl policy for database {} is {}", db.getName(), 
replPolicyId);
+        return replPolicyId;
+      }
+      LOG.debug("Repl policy is not set for database ", db.getName());
+    }
+    return null;
+  }
+
+  public static String joinWithSeparator(Iterable<?> strings) {
+    return 
org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, 
strings);
+  }
+
+  public static String[] getListFromSeparatedString(String 
commaSeparatedString) {
+    return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR 
+ "\\s*");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
new file mode 100644
index 0000000..f97f638
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * RetryingMetaStoreClient. Creates a proxy for a IMetaStoreClient
+ * implementation and retries calls to it on failure.
+ * If the login user is authenticated using keytab, it relogins user before
+ * each call.
+ *
+ */
[email protected]
+public class RetryingMetaStoreClient implements InvocationHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RetryingMetaStoreClient.class.getName());
+
+  private final IMetaStoreClient base;
+  private final UserGroupInformation ugi;
+  private final int retryLimit;
+  private final long retryDelaySeconds;
+  private final ConcurrentHashMap<String, Long> metaCallTimeMap;
+  private final long connectionLifeTimeInMillis;
+  private long lastConnectionTime;
+  private boolean localMetaStore;
+
+
+  protected RetryingMetaStoreClient(Configuration conf, Class<?>[] 
constructorArgTypes,
+                                    Object[] constructorArgs, 
ConcurrentHashMap<String, Long> metaCallTimeMap,
+                                    Class<? extends IMetaStoreClient> 
msClientClass) throws MetaException {
+
+    this.ugi = getUGI();
+
+    if (this.ugi == null) {
+      LOG.warn("RetryingMetaStoreClient unable to determine current user 
UGI.");
+    }
+
+    this.retryLimit = MetastoreConf.getIntVar(conf, 
ConfVars.THRIFT_FAILURE_RETRIES);
+    this.retryDelaySeconds = MetastoreConf.getTimeVar(conf,
+        ConfVars.CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
+    this.metaCallTimeMap = metaCallTimeMap;
+    this.connectionLifeTimeInMillis = MetastoreConf.getTimeVar(conf,
+        ConfVars.CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS);
+    this.lastConnectionTime = System.currentTimeMillis();
+    String msUri = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS);
+    localMetaStore = (msUri == null) || msUri.trim().isEmpty();
+
+    reloginExpiringKeytabUser();
+
+    this.base = JavaUtils.newInstance(msClientClass, constructorArgTypes, 
constructorArgs);
+
+    LOG.info("RetryingMetaStoreClient proxy=" + msClientClass + " ugi=" + 
this.ugi
+        + " retries=" + this.retryLimit + " delay=" + this.retryDelaySeconds
+        + " lifetime=" + this.connectionLifeTimeInMillis);
+  }
+
+  public static IMetaStoreClient getProxy(
+      Configuration hiveConf, boolean allowEmbedded) throws MetaException {
+    return getProxy(hiveConf, new Class[]{Configuration.class, 
HiveMetaHookLoader.class, Boolean.class},
+        new Object[]{hiveConf, null, allowEmbedded}, null, 
HiveMetaStoreClient.class.getName()
+    );
+  }
+
+  @VisibleForTesting
+  public static IMetaStoreClient getProxy(Configuration hiveConf, 
HiveMetaHookLoader hookLoader,
+      String mscClassName) throws MetaException {
+    return getProxy(hiveConf, hookLoader, null, mscClassName, true);
+  }
+
+  public static IMetaStoreClient getProxy(Configuration hiveConf, 
HiveMetaHookLoader hookLoader,
+      ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName, 
boolean allowEmbedded)
+          throws MetaException {
+
+    return getProxy(hiveConf,
+        new Class[] {Configuration.class, HiveMetaHookLoader.class, 
Boolean.class},
+        new Object[] {hiveConf, hookLoader, allowEmbedded},
+        metaCallTimeMap,
+        mscClassName
+    );
+  }
+
+  /**
+   * This constructor is meant for Hive internal use only.
+   * Please use getProxy(HiveConf conf, HiveMetaHookLoader hookLoader) for 
external purpose.
+   */
+  public static IMetaStoreClient getProxy(Configuration hiveConf, Class<?>[] 
constructorArgTypes,
+      Object[] constructorArgs, String mscClassName) throws MetaException {
+    return getProxy(hiveConf, constructorArgTypes, constructorArgs, null, 
mscClassName);
+  }
+
+  /**
+   * This constructor is meant for Hive internal use only.
+   * Please use getProxy(HiveConf conf, HiveMetaHookLoader hookLoader) for 
external purpose.
+   */
+  public static IMetaStoreClient getProxy(Configuration hiveConf, Class<?>[] 
constructorArgTypes,
+      Object[] constructorArgs, ConcurrentHashMap<String, Long> 
metaCallTimeMap,
+      String mscClassName) throws MetaException {
+
+    @SuppressWarnings("unchecked")
+    Class<? extends IMetaStoreClient> baseClass =
+        JavaUtils.getClass(mscClassName, IMetaStoreClient.class);
+
+    RetryingMetaStoreClient handler =
+        new RetryingMetaStoreClient(hiveConf, constructorArgTypes, 
constructorArgs,
+            metaCallTimeMap, baseClass);
+    return (IMetaStoreClient) Proxy.newProxyInstance(
+        RetryingMetaStoreClient.class.getClassLoader(), 
baseClass.getInterfaces(), handler);
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    Object ret;
+    int retriesMade = 0;
+    TException caughtException;
+
+    boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class);
+    boolean allowRetry = true;
+    Annotation[] directives = method.getDeclaredAnnotations();
+    if(directives != null) {
+      for(Annotation a : directives) {
+        if(a instanceof RetrySemantics.CannotRetry) {
+          allowRetry = false;
+        }
+      }
+    }
+
+    while (true) {
+      try {
+        reloginExpiringKeytabUser();
+
+        if (allowReconnect) {
+          if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
+            if (this.ugi != null) {
+              // Perform reconnect with the proper user context
+              try {
+                LOG.info("RetryingMetaStoreClient trying reconnect as " + 
this.ugi);
+
+                this.ugi.doAs(
+                  new PrivilegedExceptionAction<Object> () {
+                    @Override
+                    public Object run() throws MetaException {
+                      base.reconnect();
+                      return null;
+                    }
+                  });
+              } catch (UndeclaredThrowableException e) {
+                Throwable te = e.getCause();
+                if (te instanceof PrivilegedActionException) {
+                  throw te.getCause();
+                } else {
+                  throw te;
+                }
+              }
+              lastConnectionTime = System.currentTimeMillis();
+            } else {
+              LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI 
information.");
+              throw new MetaException("UGI information unavailable. Will not 
attempt a reconnect.");
+            }
+          }
+        }
+
+        if (metaCallTimeMap == null) {
+          ret = method.invoke(base, args);
+        } else {
+          // need to capture the timing
+          long startTime = System.currentTimeMillis();
+          ret = method.invoke(base, args);
+          long timeTaken = System.currentTimeMillis() - startTime;
+          addMethodTime(method, timeTaken);
+        }
+        break;
+      } catch (UndeclaredThrowableException e) {
+        throw e.getCause();
+      } catch (InvocationTargetException e) {
+        Throwable t = e.getCause();
+        if (t instanceof TApplicationException) {
+          TApplicationException tae = (TApplicationException)t;
+          switch (tae.getType()) {
+          case TApplicationException.UNSUPPORTED_CLIENT_TYPE:
+          case TApplicationException.UNKNOWN_METHOD:
+          case TApplicationException.WRONG_METHOD_NAME:
+          case TApplicationException.INVALID_PROTOCOL:
+            throw t;
+          default:
+            // TODO: most other options are probably unrecoverable... throw?
+            caughtException = tae;
+          }
+        } else if ((t instanceof TProtocolException) || (t instanceof 
TTransportException)) {
+          // TODO: most protocol exceptions are probably unrecoverable... 
throw?
+          caughtException = (TException)t;
+        } else if ((t instanceof MetaException) && t.getMessage().matches(
+            "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") &&
+            
!t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
+          caughtException = (MetaException)t;
+        } else {
+          throw t;
+        }
+      } catch (MetaException e) {
+        if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") &&
+            
!e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
+          caughtException = e;
+        } else {
+          throw e;
+        }
+      }
+
+
+      if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) 
{
+        throw caughtException;
+      }
+      retriesMade++;
+      LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + 
retriesMade + " of " +
+          retryLimit + ") after " + retryDelaySeconds + "s. " + 
method.getName(), caughtException);
+      Thread.sleep(retryDelaySeconds * 1000);
+    }
+    return ret;
+  }
+
+  /**
+   * Returns the UGI for the current user.
+   * @return the UGI for the current user.
+   */
+  private UserGroupInformation getUGI() {
+    UserGroupInformation ugi = null;
+
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      // Swallow the exception and let the call determine what to do.
+    }
+
+    return ugi;
+  }
+
+  private void addMethodTime(Method method, long timeTaken) {
+    String methodStr = getMethodString(method);
+    while (true) {
+      Long curTime = metaCallTimeMap.get(methodStr), newTime = timeTaken;
+      if (curTime != null && metaCallTimeMap.replace(methodStr, curTime, 
newTime + curTime)) break;
+      if (curTime == null && (null == metaCallTimeMap.putIfAbsent(methodStr, 
newTime))) break;
+    }
+  }
+
+  /**
+   * @param method
+   * @return String representation with arg types. eg getDatabase_(String, )
+   */
+  private String getMethodString(Method method) {
+    StringBuilder methodSb = new StringBuilder(method.getName());
+    methodSb.append("_(");
+    for (Class<?> paramClass : method.getParameterTypes()) {
+      methodSb.append(paramClass.getSimpleName());
+      methodSb.append(", ");
+    }
+    methodSb.append(")");
+    return methodSb.toString();
+  }
+
+  private boolean hasConnectionLifeTimeReached(Method method) {
+    if (connectionLifeTimeInMillis <= 0 || localMetaStore) {
+      return false;
+    }
+
+    boolean shouldReconnect =
+        (System.currentTimeMillis() - lastConnectionTime) >= 
connectionLifeTimeInMillis;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reconnection status for Method: " + method.getName() + " is " 
+ shouldReconnect);
+    }
+    return shouldReconnect;
+  }
+
+  /**
+   * Relogin if login user is logged in using keytab
+   * Relogin is actually done by ugi code only if sufficient time has passed
+   * A no-op if kerberos security is not enabled
+   * @throws MetaException
+   */
+  private void reloginExpiringKeytabUser() throws MetaException {
+    if(!UserGroupInformation.isSecurityEnabled()){
+      return;
+    }
+    try {
+      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+      //checkTGT calls ugi.relogin only after checking if it is close to tgt 
expiry
+      //hadoop relogin is actually done only every x minutes (x=10 in hadoop 
1.x)
+      if(ugi.isFromKeytab()){
+        ugi.checkTGTAndReloginFromKeytab();
+      }
+    } catch (IOException e) {
+      String msg = "Error doing relogin using keytab " + e.getMessage();
+      LOG.error(msg, e);
+      throw new MetaException(msg);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java
new file mode 100644
index 0000000..1a17fe3
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+/**
+ * Use this to get Table objects for a table list. It provides an iterator to
+ * on the resulting Table objects. It batches the calls to
+ * IMetaStoreClient.getTableObjectsByName to avoid OOM issues in HS2 (with
+ * embedded metastore) or MetaStore server (if HS2 is using remote metastore).
+ *
+ */
+public class TableIterable implements Iterable<Table> {
+
+  @Override
+  public Iterator<Table> iterator() {
+    return new Iterator<Table>() {
+
+      private final Iterator<String> tableNamesIter = tableNames.iterator();
+      private Iterator<org.apache.hadoop.hive.metastore.api.Table> batchIter = 
null;
+
+      @Override
+      public boolean hasNext() {
+        return ((batchIter != null) && batchIter.hasNext()) || 
tableNamesIter.hasNext();
+      }
+
+      @Override
+      public Table next() {
+        if ((batchIter == null) || !batchIter.hasNext()) {
+          getNextBatch();
+        }
+        return batchIter.next();
+      }
+
+      private void getNextBatch() {
+        // get next batch of table names in this list
+        List<String> nameBatch = new ArrayList<String>();
+        int batchCounter = 0;
+        while (batchCounter < batchSize && tableNamesIter.hasNext()) {
+          nameBatch.add(tableNamesIter.next());
+          batchCounter++;
+        }
+        // get the Table objects for this batch of table names and get iterator
+        // on it
+
+        try {
+          if (catName != null) {
+            batchIter = msc.getTableObjectsByName(catName, dbname, 
nameBatch).iterator();
+          } else {
+            batchIter = msc.getTableObjectsByName(dbname, 
nameBatch).iterator();
+          }
+        } catch (TException e) {
+          throw new RuntimeException(e);
+        }
+
+      }
+
+      @Override
+      public void remove() {
+        throw new IllegalStateException(
+            "TableIterable is a read-only iterable and remove() is 
unsupported");
+      }
+    };
+  }
+
+  private final IMetaStoreClient msc;
+  private final String dbname;
+  private final List<String> tableNames;
+  private final int batchSize;
+  private final String catName;
+
+  /**
+   * Primary constructor that fetches all tables in a given msc, given a Hive
+   * object,a db name and a table name list.
+   */
+  public TableIterable(IMetaStoreClient msc, String dbname, List<String> 
tableNames, int batchSize)
+      throws TException {
+    this.msc = msc;
+    this.catName = null;
+    this.dbname = dbname;
+    this.tableNames = tableNames;
+    this.batchSize = batchSize;
+  }
+
+  public TableIterable(IMetaStoreClient msc, String catName, String dbname, 
List<String>
+          tableNames, int batchSize) throws TException {
+    this.msc = msc;
+    this.catName = catName;
+    this.dbname = dbname;
+    this.tableNames = tableNames;
+    this.batchSize = batchSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
new file mode 100755
index 0000000..294dfb7
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -0,0 +1,759 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.HdfsUtils;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class represents a warehouse where data of Hive tables is stored
+ */
+public class Warehouse {
+  public static final String DEFAULT_CATALOG_NAME = "hive";
+  public static final String DEFAULT_CATALOG_COMMENT = "Default catalog, for 
Hive";
+  public static final String DEFAULT_DATABASE_NAME = "default";
+  public static final String DEFAULT_DATABASE_COMMENT = "Default Hive 
database";
+  public static final String DEFAULT_SERIALIZATION_FORMAT = "1";
+  public static final String DATABASE_WAREHOUSE_SUFFIX = ".db";
+  private static final String CAT_DB_TABLE_SEPARATOR = ".";
+
+  private Path whRoot;
+  private Path whRootExternal;
+  private final Configuration conf;
+  private final String whRootString;
+  private final String whRootExternalString;
+
+  public static final Logger LOG = 
LoggerFactory.getLogger("hive.metastore.warehouse");
+
+  private MetaStoreFS fsHandler = null;
+  private boolean storageAuthCheck = false;
+  private ReplChangeManager cm = null;
+
+  public Warehouse(Configuration conf) throws MetaException {
+    this.conf = conf;
+    whRootString = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE);
+    if (StringUtils.isBlank(whRootString)) {
+      throw new MetaException(ConfVars.WAREHOUSE.getVarname()
+          + " is not set in the config or blank");
+    }
+    whRootExternalString = MetastoreConf.getVar(conf, 
ConfVars.WAREHOUSE_EXTERNAL);
+    fsHandler = getMetaStoreFsHandler(conf);
+    cm = ReplChangeManager.getInstance(conf);
+    storageAuthCheck = MetastoreConf.getBoolVar(conf, 
ConfVars.AUTHORIZATION_STORAGE_AUTH_CHECKS);
+  }
+
+  private MetaStoreFS getMetaStoreFsHandler(Configuration conf)
+      throws MetaException {
+    String handlerClassStr = MetastoreConf.getVar(conf, 
ConfVars.FS_HANDLER_CLS);
+    try {
+      Class<? extends MetaStoreFS> handlerClass = (Class<? extends 
MetaStoreFS>) Class
+          .forName(handlerClassStr, true, JavaUtils.getClassLoader());
+      MetaStoreFS handler = ReflectionUtils.newInstance(handlerClass, conf);
+      return handler;
+    } catch (ClassNotFoundException e) {
+      throw new MetaException("Error in loading MetaStoreFS handler."
+          + e.getMessage());
+    }
+  }
+
+
+  /**
+   * Helper functions to convert IOException to MetaException
+   */
+  public static FileSystem getFs(Path f, Configuration conf) throws 
MetaException {
+    try {
+      return f.getFileSystem(conf);
+    } catch (IOException e) {
+      MetaStoreUtils.logAndThrowMetaException(e);
+    }
+    return null;
+  }
+
+  public FileSystem getFs(Path f) throws MetaException {
+    return getFs(f, conf);
+  }
+
+
+  /**
+   * Hadoop File System reverse lookups paths with raw ip addresses The File
+   * System URI always contains the canonical DNS name of the Namenode.
+   * Subsequently, operations on paths with raw ip addresses cause an exception
+   * since they don't match the file system URI.
+   *
+   * This routine solves this problem by replacing the scheme and authority of 
a
+   * path with the scheme and authority of the FileSystem that it maps to.
+   *
+   * @param path
+   *          Path to be canonicalized
+   * @return Path with canonical scheme and authority
+   */
+  public static Path getDnsPath(Path path, Configuration conf) throws 
MetaException {
+    FileSystem fs = getFs(path, conf);
+    String uriPath = path.toUri().getPath();
+    if (StringUtils.isEmpty(uriPath)) {
+      uriPath = "/";
+    }
+    return (new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), 
uriPath));
+  }
+
+  public Path getDnsPath(Path path) throws MetaException {
+    return getDnsPath(path, conf);
+  }
+
+  /**
+   * Resolve the configured warehouse root dir with respect to the 
configuration
+   * This involves opening the FileSystem corresponding to the warehouse root
+   * dir (but that should be ok given that this is only called during DDL
+   * statements for non-external tables).
+   */
+  public Path getWhRoot() throws MetaException {
+    if (whRoot != null) {
+      return whRoot;
+    }
+    whRoot = getDnsPath(new Path(whRootString));
+    return whRoot;
+  }
+
+  public Path getWhRootExternal() throws MetaException {
+    if (whRootExternal != null) {
+      return whRootExternal;
+    }
+    if (!hasExternalWarehouseRoot()) {
+      whRootExternal = getWhRoot();
+    } else {
+      whRootExternal = getDnsPath(new Path(whRootExternalString));
+    }
+    return whRootExternal;
+  }
+
+  /**
+   * Build the database path based on catalog name and database name.  This 
should only be used
+   * when a database is being created or altered.  If you just want to find 
out the path a
+   * database is already using call {@link #getDatabasePath(Database)}.  If 
the passed in
+   * database already has a path set that will be used.  If not the location 
will be built using
+   * catalog's path and the database name.
+   * @param cat catalog the database is in
+   * @param db database object
+   * @return Path representing the directory for the database
+   * @throws MetaException when the file path cannot be properly determined 
from the configured
+   * file system.
+   */
+  public Path determineDatabasePath(Catalog cat, Database db) throws 
MetaException {
+    if (db.isSetLocationUri()) {
+      return getDnsPath(new Path(db.getLocationUri()));
+    }
+    if (cat == null || cat.getName().equalsIgnoreCase(DEFAULT_CATALOG_NAME)) {
+      if (db.getName().equalsIgnoreCase(DEFAULT_DATABASE_NAME)) {
+        return getWhRoot();
+      } else {
+        return new Path(getWhRoot(), dbDirFromDbName(db));
+      }
+    } else {
+      return new Path(getDnsPath(new Path(cat.getLocationUri())), 
dbDirFromDbName(db));
+    }
+  }
+
+  private String dbDirFromDbName(Database db) throws MetaException {
+    return db.getName().toLowerCase() + DATABASE_WAREHOUSE_SUFFIX;
+  }
+
+  /**
+   * Get the path specified by the database.  In the case of the default 
database the root of the
+   * warehouse is returned.
+   * @param db database to get the path of
+   * @return path to the database directory
+   * @throws MetaException when the file path cannot be properly determined 
from the configured
+   * file system.
+   */
+  public Path getDatabasePath(Database db) throws MetaException {
+    if (db.getCatalogName().equalsIgnoreCase(DEFAULT_CATALOG_NAME) &&
+        db.getName().equalsIgnoreCase(DEFAULT_DATABASE_NAME)) {
+      return getWhRoot();
+    }
+    return new Path(db.getLocationUri());
+  }
+
+  public Path getDefaultDatabasePath(String dbName) throws MetaException {
+    // TODO CAT - I am fairly certain that most calls to this are in error.  
This should only be
+    // used when the database location is unset, which should never happen 
except when a
+    // new database is being created.  Once I have confirmation of this change 
calls of this to
+    // getDatabasePath(), since it does the right thing.  Also, merge this with
+    // determineDatabasePath() as it duplicates much of the logic.
+    if (dbName.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) {
+      return getWhRoot();
+    }
+    return new Path(getWhRoot(), dbName.toLowerCase() + 
DATABASE_WAREHOUSE_SUFFIX);
+  }
+
+  public Path getDefaultExternalDatabasePath(String dbName) throws 
MetaException {
+    if (dbName.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) {
+      return getWhRootExternal();
+    }
+    return new Path(getWhRootExternal(), dbName.toLowerCase() + 
DATABASE_WAREHOUSE_SUFFIX);
+  }
+
+  private boolean hasExternalWarehouseRoot() {
+    return !StringUtils.isBlank(whRootExternalString);
+  }
+
+  /**
+   * Returns the default location of the table path using the parent 
database's location
+   * @param db Database where the table is created
+   * @param tableName table name
+   * @return
+   * @throws MetaException
+   */
+  @Deprecated
+  public Path getDefaultTablePath(Database db, String tableName)
+      throws MetaException {
+    return getDefaultTablePath(db, tableName, false);
+  }
+
+  public Path getDefaultTablePath(Database db, String tableName, boolean 
isExternal) throws MetaException {
+    Path dbPath = null;
+    if (isExternal && hasExternalWarehouseRoot()) {
+      dbPath = getDefaultExternalDatabasePath(db.getName());
+    } else {
+      dbPath = getDatabasePath(db);
+    }
+    return getDnsPath(
+        new Path(dbPath, 
MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
+  }
+
+  // A few situations where we need the default table path, without a DB object
+  public Path getDefaultTablePath(String dbName, String tableName, boolean 
isExternal) throws MetaException {
+    Path dbPath = null;
+    if (isExternal && hasExternalWarehouseRoot()) {
+      dbPath = getDefaultExternalDatabasePath(dbName);
+    } else {
+      dbPath = getDefaultDatabasePath(dbName);
+    }
+    return getDnsPath(
+        new Path(dbPath, 
MetaStoreUtils.encodeTableName(tableName.toLowerCase())));
+  }
+
+  public Path getDefaultTablePath(Database db, Table table) throws 
MetaException {
+    return getDefaultTablePath(db, table.getTableName(), 
MetaStoreUtils.isExternalTable(table));
+  }
+
+  @Deprecated // Use TableName
+  public static String getQualifiedName(Table table) {
+    return TableName.getDbTable(table.getDbName(), table.getTableName());
+  }
+
+  @Deprecated // Use TableName
+  public static String getQualifiedName(String dbName, String tableName) {
+    return TableName.getDbTable(dbName, tableName);
+  }
+
+  public static String getQualifiedName(Partition partition) {
+    return partition.getDbName() + "." + partition.getTableName() + 
partition.getValues();
+  }
+
+  /**
+   * Get table name in cat.db.table format.
+   * @param table table object
+   * @return fully qualified name.
+   */
+  public static String getCatalogQualifiedTableName(Table table) {
+    return TableName.getQualified(table.getCatName(), table.getDbName(), 
table.getTableName());
+  }
+
+  public boolean mkdirs(Path f) throws MetaException {
+    FileSystem fs;
+    try {
+      fs = getFs(f);
+      return FileUtils.mkdir(fs, f);
+    } catch (IOException e) {
+      MetaStoreUtils.logAndThrowMetaException(e);
+    }
+    return false;
+  }
+
+  public boolean renameDir(Path sourcePath, Path destPath, boolean 
needCmRecycle) throws MetaException {
+    try {
+      if (needCmRecycle) {
+        // Copy the source files to cmroot. As the client will move the source 
files to another
+        // location, we should make a copy of the files to cmroot instead of 
moving it.
+        cm.recycle(sourcePath, RecycleType.COPY, true);
+      }
+      FileSystem srcFs = getFs(sourcePath);
+      FileSystem destFs = getFs(destPath);
+      return FileUtils.rename(srcFs, destFs, sourcePath, destPath);
+    } catch (Exception ex) {
+      MetaStoreUtils.logAndThrowMetaException(ex);
+    }
+    return false;
+  }
+
+  void addToChangeManagement(Path file) throws MetaException {
+    try {
+      cm.recycle(file, RecycleType.COPY, true);
+    } catch (IOException e) {
+      throw new 
MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+  }
+
+  public boolean deleteDir(Path f, boolean recursive, Database db) throws 
MetaException {
+    return deleteDir(f, recursive, false, db);
+  }
+
+  public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, 
Database db) throws MetaException {
+    return deleteDir(f, recursive, ifPurge, 
ReplChangeManager.isSourceOfReplication(db));
+  }
+
+  public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean 
needCmRecycle) throws MetaException {
+    if (needCmRecycle) {
+      try {
+        cm.recycle(f, RecycleType.MOVE, ifPurge);
+      } catch (IOException e) {
+        throw new 
MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+    }
+    FileSystem fs = getFs(f);
+    return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf);
+  }
+
+  public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException 
{
+    try {
+      cm.recycle(f, RecycleType.MOVE, ifPurge);
+    } catch (IOException e) {
+      throw new 
MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+  }
+
+  public boolean isEmpty(Path path) throws IOException, MetaException {
+    ContentSummary contents = getFs(path).getContentSummary(path);
+    if (contents != null && contents.getFileCount() == 0 && 
contents.getDirectoryCount() == 1) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWritable(Path path) throws IOException {
+    if (!storageAuthCheck) {
+      // no checks for non-secure hadoop installations
+      return true;
+    }
+    if (path == null) { //what??!!
+      return false;
+    }
+    final FileStatus stat;
+    final FileSystem fs;
+    try {
+      fs = getFs(path);
+      stat = fs.getFileStatus(path);
+      HdfsUtils.checkFileAccess(fs, stat, FsAction.WRITE);
+      return true;
+    } catch (FileNotFoundException fnfe){
+      // File named by path doesn't exist; nothing to validate.
+      return true;
+    } catch (Exception e) {
+      // all other exceptions are considered as emanating from
+      // unauthorized accesses
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exception when checking if path (" + path + ")", e);
+      }
+      return false;
+    }
+  }
+
+  private static String escapePathName(String path) {
+    return FileUtils.escapePathName(path);
+  }
+
+  private static String unescapePathName(String path) {
+    return FileUtils.unescapePathName(path);
+  }
+
+  /**
+   * Given a partition specification, return the path corresponding to the
+   * partition spec. By default, the specification does not include dynamic 
partitions.
+   * @param spec
+   * @return string representation of the partition specification.
+   * @throws MetaException
+   */
+  public static String makePartPath(Map<String, String> spec)
+      throws MetaException {
+    return makePartName(spec, true);
+  }
+
+  /**
+   * Makes a partition name from a specification
+   * @param spec
+   * @param addTrailingSeperator if true, adds a trailing separator e.g. 
'ds=1/'
+   * @return partition name
+   * @throws MetaException
+   */
+  public static String makePartName(Map<String, String> spec,
+      boolean addTrailingSeperator)
+      throws MetaException {
+    StringBuilder suffixBuf = new StringBuilder();
+    int i = 0;
+    for (Entry<String, String> e : spec.entrySet()) {
+      if (e.getValue() == null || e.getValue().length() == 0) {
+        throw new MetaException("Partition spec is incorrect. " + spec);
+      }
+      if (i>0) {
+        suffixBuf.append(Path.SEPARATOR);
+      }
+      suffixBuf.append(escapePathName(e.getKey()));
+      suffixBuf.append('=');
+      suffixBuf.append(escapePathName(e.getValue()));
+      i++;
+    }
+    if (addTrailingSeperator) {
+      suffixBuf.append(Path.SEPARATOR);
+    }
+    return suffixBuf.toString();
+  }
+  /**
+   * Given a dynamic partition specification, return the path corresponding to 
the
+   * static part of partition specification. This is basically a copy of 
makePartName
+   * but we get rid of MetaException since it is not serializable.
+   * @param spec
+   * @return string representation of the static part of the partition 
specification.
+   */
+  public static String makeDynamicPartName(Map<String, String> spec) {
+    StringBuilder suffixBuf = new StringBuilder();
+    for (Entry<String, String> e : spec.entrySet()) {
+      if (e.getValue() != null && e.getValue().length() > 0) {
+        suffixBuf.append(escapePathName(e.getKey()));
+        suffixBuf.append('=');
+        suffixBuf.append(escapePathName(e.getValue()));
+        suffixBuf.append(Path.SEPARATOR);
+      } else { // stop once we see a dynamic partition
+        break;
+      }
+    }
+    return suffixBuf.toString();
+  }
+
+  static final Pattern pat = Pattern.compile("([^/]+)=([^/]+)");
+
+  private static final Pattern slash = Pattern.compile("/");
+
+  /**
+   * Extracts values from partition name without the column names.
+   * @param name Partition name.
+   * @param result The result. Must be pre-sized to the expected number of 
columns.
+   */
+  public static AbstractList<String> makeValsFromName(
+      String name, AbstractList<String> result) throws MetaException {
+    assert name != null;
+    String[] parts = slash.split(name, 0);
+    if (result == null) {
+      result = new ArrayList<>(parts.length);
+      for (int i = 0; i < parts.length; ++i) {
+        result.add(null);
+      }
+    } else if (parts.length != result.size()) {
+      throw new MetaException(
+          "Expected " + result.size() + " components, got " + parts.length + " 
(" + name + ")");
+    }
+    for (int i = 0; i < parts.length; ++i) {
+      int eq = parts[i].indexOf('=');
+      if (eq <= 0) {
+        throw new MetaException("Unexpected component " + parts[i]);
+      }
+      result.set(i, unescapePathName(parts[i].substring(eq + 1)));
+    }
+    return result;
+  }
+
+  public static LinkedHashMap<String, String> makeSpecFromName(String name)
+      throws MetaException {
+    if (name == null || name.isEmpty()) {
+      throw new MetaException("Partition name is invalid. " + name);
+    }
+    LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+    makeSpecFromName(partSpec, new Path(name), null);
+    return partSpec;
+  }
+
+  public static boolean makeSpecFromName(Map<String, String> partSpec, Path 
currPath,
+      Set<String> requiredKeys) {
+    List<String[]> kvs = new ArrayList<>();
+    do {
+      String component = currPath.getName();
+      Matcher m = pat.matcher(component);
+      if (m.matches()) {
+        String k = unescapePathName(m.group(1));
+        String v = unescapePathName(m.group(2));
+        String[] kv = new String[2];
+        kv[0] = k;
+        kv[1] = v;
+        kvs.add(kv);
+      }
+      currPath = currPath.getParent();
+    } while (currPath != null && !currPath.getName().isEmpty());
+
+    // reverse the list since we checked the part from leaf dir to table's 
base dir
+    for (int i = kvs.size(); i > 0; i--) {
+      String key = kvs.get(i - 1)[0];
+      if (requiredKeys != null) {
+        requiredKeys.remove(key);
+      }
+      partSpec.put(key, kvs.get(i - 1)[1]);
+    }
+    if (requiredKeys == null || requiredKeys.isEmpty()) return true;
+    LOG.warn("Cannot create partition spec from " + currPath + "; missing keys 
" + requiredKeys);
+    return false;
+  }
+
+  public static Map<String, String> makeEscSpecFromName(String name) throws 
MetaException {
+
+    if (name == null || name.isEmpty()) {
+      throw new MetaException("Partition name is invalid. " + name);
+    }
+    LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+
+    Path currPath = new Path(name);
+
+    List<String[]> kvs = new ArrayList<>();
+    do {
+      String component = currPath.getName();
+      Matcher m = pat.matcher(component);
+      if (m.matches()) {
+        String k = m.group(1);
+        String v = m.group(2);
+        String[] kv = new String[2];
+        kv[0] = k;
+        kv[1] = v;
+        kvs.add(kv);
+      }
+      currPath = currPath.getParent();
+    } while (currPath != null && !currPath.getName().isEmpty());
+
+    // reverse the list since we checked the part from leaf dir to table's 
base dir
+    for (int i = kvs.size(); i > 0; i--) {
+      partSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
+    }
+
+    return partSpec;
+  }
+
+  /**
+   * Returns the default partition path of a table within a given database and 
partition key value
+   * pairs. It uses the database location and appends it the table name and 
the partition key,value
+   * pairs to create the Path for the partition directory
+   *
+   * @param db - parent database which is used to get the base location of the 
partition directory
+   * @param tableName - table name for the partitions
+   * @param pm - Partition key value pairs
+   * @return
+   * @throws MetaException
+   */
+  public Path getDefaultPartitionPath(Database db, Table table,
+      Map<String, String> pm) throws MetaException {
+    return getPartitionPath(getDefaultTablePath(db, table), pm);
+  }
+
+  /**
+   * Returns the path object for the given partition key-value pairs and the 
base location
+   *
+   * @param tblPath - the base location for the partitions. Typically the 
table location
+   * @param pm - Partition key value pairs
+   * @return
+   * @throws MetaException
+   */
+  public Path getPartitionPath(Path tblPath, Map<String, String> pm)
+      throws MetaException {
+    return new Path(tblPath, makePartPath(pm));
+  }
+
+  /**
+   * Given a database, a table and the partition key value pairs this method 
returns the Path object
+   * corresponding to the partition key value pairs. It uses the table 
location if available else
+   * uses the database location for constructing the path corresponding to the 
partition key-value
+   * pairs
+   *
+   * @param db - Parent database of the given table
+   * @param table - Table for which the partition key-values are given
+   * @param vals - List of values for the partition keys
+   * @return Path corresponding to the partition key-value pairs
+   * @throws MetaException
+   */
+  public Path getPartitionPath(Database db, Table table, List<String> vals)
+      throws MetaException {
+    List<FieldSchema> partKeys = table.getPartitionKeys();
+    if (partKeys == null || (partKeys.size() != vals.size())) {
+      throw new MetaException("Invalid number of partition keys found for " + 
table.getTableName());
+    }
+    Map<String, String> pm = new LinkedHashMap<>(vals.size());
+    int i = 0;
+    for (FieldSchema key : partKeys) {
+      pm.put(key.getName(), vals.get(i));
+      i++;
+    }
+
+    if (table.getSd().getLocation() != null) {
+      return getPartitionPath(getDnsPath(new 
Path(table.getSd().getLocation())), pm);
+    } else {
+      return getDefaultPartitionPath(db, table, pm);
+    }
+  }
+
+  public boolean isDir(Path f) throws MetaException {
+    FileSystem fs;
+    try {
+      fs = getFs(f);
+      FileStatus fstatus = fs.getFileStatus(f);
+      if (!fstatus.isDir()) {
+        return false;
+      }
+    } catch (FileNotFoundException e) {
+      return false;
+    } catch (IOException e) {
+      MetaStoreUtils.logAndThrowMetaException(e);
+    }
+    return true;
+  }
+
+  public static String makePartName(List<FieldSchema> partCols,
+      List<String> vals) throws MetaException {
+    return makePartName(partCols, vals, null);
+  }
+
+  /**
+   * @param desc
+   * @return array of FileStatus objects corresponding to the files
+   * making up the passed storage description
+   */
+  public List<FileStatus> getFileStatusesForSD(StorageDescriptor desc)
+      throws MetaException {
+    return getFileStatusesForLocation(desc.getLocation());
+  }
+
+  /**
+   * @param location
+   * @return array of FileStatus objects corresponding to the files
+   * making up the passed storage description
+   */
+  public List<FileStatus> getFileStatusesForLocation(String location)
+      throws MetaException {
+    try {
+      Path path = new Path(location);
+      FileSystem fileSys = path.getFileSystem(conf);
+      return FileUtils.getFileStatusRecurse(path, -1, fileSys);
+    } catch (IOException ioe) {
+      MetaStoreUtils.logAndThrowMetaException(ioe);
+    }
+    return null;
+  }
+
+  /**
+   * @param db database
+   * @param table table
+   * @return array of FileStatus objects corresponding to the files making up 
the passed
+   * unpartitioned table
+   */
+  public List<FileStatus> getFileStatusesForUnpartitionedTable(Database db, 
Table table)
+      throws MetaException {
+    Path tablePath = getDnsPath(new Path(table.getSd().getLocation()));
+    try {
+      FileSystem fileSys = tablePath.getFileSystem(conf);
+      return FileUtils.getFileStatusRecurse(tablePath, -1, fileSys);
+    } catch (IOException ioe) {
+      MetaStoreUtils.logAndThrowMetaException(ioe);
+    }
+    return null;
+  }
+
+  /**
+   * Makes a valid partition name.
+   * @param partCols The partition columns
+   * @param vals The partition values
+   * @param defaultStr
+   *    The default name given to a partition value if the respective value is 
empty or null.
+   * @return An escaped, valid partition name.
+   * @throws MetaException
+   */
+  public static String makePartName(List<FieldSchema> partCols,
+      List<String> vals, String defaultStr) throws MetaException {
+    if ((partCols.size() != vals.size()) || (partCols.size() == 0)) {
+      String errorStr = "Invalid partition key & values; keys [";
+      for (FieldSchema fs : partCols) {
+        errorStr += (fs.getName() + ", ");
+      }
+      errorStr += "], values [";
+      for (String val : vals) {
+        errorStr += (val + ", ");
+      }
+      throw new MetaException(errorStr + "]");
+    }
+    List<String> colNames = new ArrayList<>();
+    for (FieldSchema col: partCols) {
+      colNames.add(col.getName());
+    }
+    return FileUtils.makePartName(colNames, vals, defaultStr);
+  }
+
+  public static List<String> getPartValuesFromPartName(String partName)
+      throws MetaException {
+    LinkedHashMap<String, String> partSpec = 
Warehouse.makeSpecFromName(partName);
+    List<String> values = new ArrayList<>();
+    values.addAll(partSpec.values());
+    return values;
+  }
+
+  public static Map<String, String> makeSpecFromValues(List<FieldSchema> 
partCols,
+      List<String> values) {
+    Map<String, String> spec = new LinkedHashMap<>();
+    for (int i = 0; i < values.size(); i++) {
+      spec.put(partCols.get(i).getName(), values.get(i));
+    }
+    return spec;
+  }
+}

Reply via email to