Repository: hive
Updated Branches:
  refs/heads/master bd371246e -> 4142c98c1


HIVE-17606 : Improve security for DB notification related APIs (Tao Li via 
Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4142c98c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4142c98c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4142c98c

Branch: refs/heads/master
Commit: 4142c98c18c6ae2aee81abbac00591a27f55e425
Parents: bd37124
Author: Tao LI <[email protected]>
Authored: Thu Sep 28 14:16:18 2017 -0700
Committer: Thejas M Nair <[email protected]>
Committed: Thu Sep 28 14:16:29 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 ++
 .../hive/hcatalog/api/TestHCatClient.java       |  7 +++
 .../hadoop/hive/ql/parse/TestCopyUtils.java     |  8 +--
 .../hadoop/hive/ql/parse/TestExportImport.java  |  2 +
 .../hive/ql/parse/TestReplicationScenarios.java | 39 +++++++++++++++
 ...TestReplicationScenariosAcrossInstances.java |  2 +
 .../hadoop/hive/metastore/HiveMetaStore.java    | 41 +++++++++++++++
 .../hadoop/hive/metastore/MetaStoreUtils.java   | 26 ++++++++++
 .../hadoop/hive/ql/exec/ReplCopyTask.java       | 32 ++++++------
 .../hadoop/hive/ql/parse/repl/CopyUtils.java    | 34 ++++++++++---
 .../hive/metastore/ReplChangeManager.java       | 52 +++++++++++++++++---
 11 files changed, 214 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d94ff85..5bec15e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -794,6 +794,9 @@ public class HiveConf extends Configuration {
     
METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", 
"86400s",
         new TimeValidator(TimeUnit.SECONDS),
         "time after which events will be removed from the database listener 
queue"),
+    
METASTORE_EVENT_DB_NOTIFICATION_API_AUTH("hive.metastore.event.db.notification.api.auth",
 true,
+        "Should metastore do authorization against database notification 
related APIs such as get_next_notification.\n" +
+        "If set to true, then only the superusers in proxy settings have the 
permission"),
     
METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks",
 false,
         "Should the metastore do authorization checks against the underlying 
storage (usually hdfs) \n" +
         "for operations like drop-partition (disallow the drop-partition if 
the user in\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
 
b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
index d2474cc..14c9a4b 100644
--- 
a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
+++ 
b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
@@ -49,7 +49,9 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hive.hcatalog.api.repl.Command;
 import org.apache.hive.hcatalog.api.repl.ReplicationTask;
 import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
@@ -109,6 +111,11 @@ public class TestHCatClient {
       return;
     }
 
+    // Set proxy user privilege and initialize the global state of ProxyUsers
+    Configuration conf = new Configuration();
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
+    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+
     System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
         DbNotificationListener.class.getName()); // turn on db notification 
listener on metastore
     msPort = MetaStoreTestUtils.startMetaStore();

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
index 98b2a3c..f14b430 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
@@ -81,12 +81,12 @@ public class TestCopyUtils {
     Configuration conf = new Configuration();
     conf.set("dfs.client.use.datanode.hostname", "true");
 
-    MiniDFSCluster miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-
     UserGroupInformation ugi = Utils.getUGI();
-    String currentUser = ugi.getShortUserName();
+    final String currentUser = ugi.getShortUserName();
+    conf.set("hadoop.proxyuser." + currentUser + ".hosts", "*");
 
+    MiniDFSCluster miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
     HashMap<String, String> overridesForHiveConf = new HashMap<String, 
String>() {{
       put(ConfVars.HIVE_IN_TEST.varname, "false");
       put(ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1");

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
index 70a57f8..e86ee5e 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.parse;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.Utils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -48,6 +49,7 @@ public class TestExportImport {
   public static void classLevelSetup() throws Exception {
     Configuration conf = new Configuration();
     conf.set("dfs.client.use.datanode.hostname", "true");
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
     MiniDFSCluster miniDFSCluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
     HashMap<String, String> overridesForHiveConf = new HashMap<String, 
String>() {{

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index a8c3a0b..7cf1498 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
 import org.apache.thrift.TException;
 import org.junit.After;
@@ -98,6 +100,7 @@ public class TestReplicationScenarios {
   private static int msPort;
   private static Driver driver;
   private static HiveMetaStoreClient metaStoreClient;
+  private static String proxySettingName;
   static HiveConf hconfMirror;
   static int msPortMirror;
   static Driver driverMirror;
@@ -133,6 +136,8 @@ public class TestReplicationScenarios {
     hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
     hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
     hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
+    proxySettingName = "hadoop.proxyuser." + Utils.getUGI().getShortUserName() 
+ ".hosts";
+    hconf.set(proxySettingName, "*");
     msPort = MetaStoreTestUtils.startMetaStore(hconf);
     hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
     hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
@@ -3264,6 +3269,40 @@ public class TestReplicationScenarios {
     assertFalse(new AndFilter(no, no, no).accept(dummyEvent));
   }
 
+  @Test
+  public void testAuthForNotificationAPIs() throws Exception {
+    // Setup
+    long firstEventId = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+    String dbName = "testAuthForNotificationAPIs";
+    driver.run("create database " + dbName);
+    NotificationEventResponse rsp = 
metaStoreClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+    // Test various scenarios
+    // Remove the proxy privilege and the auth should fail (in reality the 
proxy setting should not be changed on the fly)
+    hconf.unset(proxySettingName);
+    // Need to explicitly update ProxyUsers
+    ProxyUsers.refreshSuperUserGroupsConfiguration(hconf);
+    // Verify if the auth should fail
+    Exception ex = null;
+    try {
+      rsp = metaStoreClient.getNextNotification(firstEventId, 0, null);
+    } catch (TException e) {
+      ex = e;
+    }
+    assertNotNull(ex);
+    // Disable auth so the call should succeed
+    
hconf.setBoolVar(HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH, 
false);
+    try {
+      rsp = metaStoreClient.getNextNotification(firstEventId, 0, null);
+      assertEquals(1, rsp.getEventsSize());
+    } finally {
+      // Restore the settings
+      
hconf.setBoolVar(HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH, 
true);
+      hconf.set(proxySettingName, "*");
+      ProxyUsers.refreshSuperUserGroupsConfiguration(hconf);
+    }
+  }
+
   private NotificationEvent createDummyEvent(String dbname, String tblname, 
long evid) {
     MessageFactory msgFactory = MessageFactory.getInstance();
     Table t = new Table();

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 50c4a98..aa2c3bb 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
+import org.apache.hadoop.hive.shims.Utils;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -63,6 +64,7 @@ public class TestReplicationScenariosAcrossInstances {
   public static void classLevelSetup() throws Exception {
     Configuration conf = new Configuration();
     conf.set("dfs.client.use.datanode.hostname", "true");
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
     MiniDFSCluster miniDFSCluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
     primary = new WarehouseInstance(LOG, miniDFSCluster);

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index d5de4f2..20e7ec1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -7055,12 +7055,28 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     @Override
     public NotificationEventResponse 
get_next_notification(NotificationEventRequest rqst)
         throws TException {
+      try {
+        authorizeProxyPrivilege();
+      } catch (Exception ex) {
+        LOG.error("Not authorized to make the get_next_notification call. You 
can try to disable " +
+            
HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH.varname, ex);
+        throw new TException(ex);
+      }
+
       RawStore ms = getMS();
       return ms.getNextNotification(rqst);
     }
 
     @Override
     public CurrentNotificationEventId get_current_notificationEventId() throws 
TException {
+      try {
+        authorizeProxyPrivilege();
+      } catch (Exception ex) {
+        LOG.error("Not authorized to make the get_current_notificationEventId 
call. You can try to disable " +
+            
HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH.varname, ex);
+        throw new TException(ex);
+      }
+
       RawStore ms = getMS();
       return ms.getCurrentNotificationEventId();
     }
@@ -7068,10 +7084,35 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     @Override
     public NotificationEventsCountResponse 
get_notification_events_count(NotificationEventsCountRequest rqst)
             throws TException {
+      try {
+        authorizeProxyPrivilege();
+      } catch (Exception ex) {
+        LOG.error("Not authorized to make the get_notification_events_count 
call. You can try to disable " +
+            
HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH.varname, ex);
+        throw new TException(ex);
+      }
+
       RawStore ms = getMS();
       return ms.getNotificationEventsCount(rqst);
     }
 
+    private void authorizeProxyPrivilege() throws Exception {
+      // Skip the auth in embedded mode or if the auth is disabled
+      if (!isMetaStoreRemote() || 
!hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH))
 {
+        return;
+      }
+      String user = null;
+      try {
+        user = Utils.getUGI().getShortUserName();
+      } catch (Exception ex) {
+        LOG.error("Cannot obtain username", ex);
+        throw ex;
+      }
+      if (!MetaStoreUtils.checkUserHasHostProxyPrivileges(user, hiveConf, 
getIPAddress())) {
+        throw new MetaException("User " + user + " is not allowed to perform 
this API call");
+      }
+    }
+
     @Override
     public FireEventResponse fire_listener_event(FireEventRequest rqst) throws 
TException {
       switch (rqst.getData().getSetField()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index b51446d..a147a25 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -106,6 +107,9 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.util.MachineList;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
 
@@ -1974,4 +1978,26 @@ public class MetaStoreUtils {
   public static double decimalToDouble(Decimal decimal) {
     return new BigDecimal(new BigInteger(decimal.getUnscaled()), 
decimal.getScale()).doubleValue();
   }
+
+  /**
+   * Verify if the user is allowed to make DB notification related calls.
+   * Only the superusers defined in the Hadoop proxy user settings have the 
permission.
+   *
+   * @param user the short user name
+   * @param config that contains the proxy user settings
+   * @return if the user has the permission
+   */
+  public static boolean checkUserHasHostProxyPrivileges(String user, 
Configuration conf, String ipAddress) {
+    DefaultImpersonationProvider sip = 
ProxyUsers.getDefaultImpersonationProvider();
+    // Just need to initialize the ProxyUsers for the first time, given that 
the conf will not change on the fly
+    if (sip == null) {
+      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+      sip = ProxyUsers.getDefaultImpersonationProvider();
+    }
+    Map<String, Collection<String>> proxyHosts = sip.getProxyHosts();
+    Collection<String> hostEntries = 
proxyHosts.get(sip.getProxySuperuserIpConfKey(user));
+    MachineList machineList = new MachineList(hostEntries);
+    ipAddress = (ipAddress == null) ? StringUtils.EMPTY : ipAddress;
+    return machineList.includes(ipAddress);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 54746d3..04ab904 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -35,6 +35,7 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.calcite.util.Pair;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -76,11 +77,10 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       // be a CM uri in the from path.
       if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) {
         String[] result = 
ReplChangeManager.getFileWithChksumFromURI(fromPath.toString());
-        Path sourcePath = ReplChangeManager
-            .getFileStatus(new Path(result[0]), result[1], conf)
-            .getPath();
+        ReplChangeManager.FileInfo sourceInfo = ReplChangeManager
+            .getFileInfo(new Path(result[0]), result[1], conf);
         if (FileUtils.copy(
-            sourcePath.getFileSystem(conf), sourcePath,
+            sourceInfo.getFs(), sourceInfo.getSourcePath(),
             dstFs, toPath, false, false, conf)) {
           return 0;
         } else {
@@ -90,13 +90,13 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
         }
       }
 
-      List<Path> srcPaths = new ArrayList<>();
+      List<ReplChangeManager.FileInfo> srcFiles = new ArrayList<>();
       if (rwork.readSrcAsFilesList()) {
         // This flow is usually taken for REPL LOAD
         // Our input is the result of a _files listing, we should expand out 
_files.
-        srcPaths = filesInFileListing(srcFs, fromPath);
-        LOG.debug("ReplCopyTask _files contains:" + (srcPaths == null ? "null" 
: srcPaths.size()));
-        if ((srcPaths == null) || (srcPaths.isEmpty())) {
+        srcFiles = filesInFileListing(srcFs, fromPath);
+        LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" 
: srcFiles.size()));
+        if ((srcFiles == null) || (srcFiles.isEmpty())) {
           if (work.isErrorOnSrcEmpty()) {
             console.printError("No _files entry found on source: " + 
fromPath.toString());
             return 5;
@@ -120,17 +120,18 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
         for (FileStatus oneSrc : srcs) {
           console.printInfo("Copying file: " + oneSrc.getPath().toString());
           LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
-          srcPaths.add(oneSrc.getPath());
+          srcFiles.add(new 
ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf),
+              oneSrc.getPath(), null, null, true));
         }
       }
 
-      LOG.debug("ReplCopyTask numFiles: {}", srcPaths.size());
+      LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size());
       if (!FileUtils.mkdir(dstFs, toPath, conf)) {
         console.printError("Cannot make target directory: " + 
toPath.toString());
         return 2;
       }
       // Copy the files from different source file systems to one destination 
directory
-      new CopyUtils(rwork.distCpDoAsUser(), conf).doCopy(toPath, srcPaths);
+      new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(toPath, 
srcFiles);
 
       return 0;
     } catch (Exception e) {
@@ -140,7 +141,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
     }
   }
 
-  private List<Path> filesInFileListing(FileSystem fs, Path dataPath)
+  private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, 
Path dataPath)
       throws IOException {
     Path fileListing = new Path(dataPath, EximUtil.FILES_NAME);
     LOG.debug("ReplCopyTask filesInFileListing() reading " + 
fileListing.toUri());
@@ -150,7 +151,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       // On success, but with nothing to return, we can return an empty list.
     }
 
-    List<Path> filePaths = new ArrayList<>();
+    List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
     BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(fileListing)));
     // TODO : verify if skipping charset here is okay
 
@@ -160,9 +161,8 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
 
       String[] fileWithChksum = 
ReplChangeManager.getFileWithChksumFromURI(line);
       try {
-        Path f = ReplChangeManager
-                .getFileStatus(new Path(fileWithChksum[0]), fileWithChksum[1], 
conf)
-                .getPath();
+        ReplChangeManager.FileInfo f = ReplChangeManager
+                .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], 
conf);
         filePaths.add(f);
       } catch (MetaException e) {
         // issue warning for missing file and throw exception

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index a022b5d..3d99499 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -46,6 +46,7 @@ public class CopyUtils {
   private final long maxNumberOfFiles;
   private final boolean hiveInTest;
   private final String copyAsUser;
+  private final int MAX_COPY_RETRY = 3;
 
   public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
     this.hiveConf = hiveConf;
@@ -88,12 +89,11 @@ public class CopyUtils {
       3. aggregate fileSize of all source Paths(can be directory /  file) is 
less than configured size.
       4. number of files of all source Paths(can be directory /  file) is less 
than configured size.
   */
-  private boolean regularCopy(FileSystem destinationFs, Map.Entry<FileSystem, 
List<Path>> entry)
+  private boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, 
List<ReplChangeManager.FileInfo> fileList)
       throws IOException {
     if (hiveInTest) {
       return true;
     }
-    FileSystem sourceFs = entry.getKey();
     if (isLocal(sourceFs) || isLocal(destinationFs)) {
       return true;
     }
@@ -104,8 +104,17 @@ public class CopyUtils {
     long size = 0;
     long numberOfFiles = 0;
 
-    for (Path path : entry.getValue()) {
-      ContentSummary contentSummary = sourceFs.getContentSummary(path);
+    for (ReplChangeManager.FileInfo fileInfo : fileList) {
+      ContentSummary contentSummary = null;
+      try {
+        contentSummary = 
sourceFs.getContentSummary(fileInfo.getEffectivePath());
+      } catch (FileNotFoundException e) {
+        // in replication, if source file does not exist, try cmroot
+        if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) {
+          contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath());
+          fileInfo.setIsUseSourcePath(false);
+        }
+      }
       size += contentSummary.getLength();
       numberOfFiles += contentSummary.getFileCount();
       if (limitReachedForLocalCopy(size, numberOfFiles)) {
@@ -129,15 +138,28 @@ public class CopyUtils {
     return fs.getScheme().equals("file");
   }
 
-  private Map<FileSystem, List<Path>> fsToFileMap(List<Path> srcPaths) throws 
IOException {
+  private Map<FileSystem, List<Path>> fsToPathMap(List<Path> srcPaths) throws 
IOException {
     Map<FileSystem, List<Path>> result = new HashMap<>();
     for (Path path : srcPaths) {
       FileSystem fileSystem = path.getFileSystem(hiveConf);
       if (!result.containsKey(fileSystem)) {
-        result.put(fileSystem, new ArrayList<>());
+        result.put(fileSystem, new ArrayList<Path>());
       }
       result.get(fileSystem).add(path);
     }
     return result;
   }
+
+  private Map<FileSystem, List<ReplChangeManager.FileInfo>> fsToFileMap(
+      List<ReplChangeManager.FileInfo> srcFiles) throws IOException {
+    Map<FileSystem, List<ReplChangeManager.FileInfo>> result = new HashMap<>();
+    for (ReplChangeManager.FileInfo file : srcFiles) {
+      FileSystem fileSystem = file.getFs();
+      if (!result.containsKey(fileSystem)) {
+        result.put(fileSystem, new ArrayList<ReplChangeManager.FileInfo>());
+      }
+      result.get(fileSystem).add(file);
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4142c98c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index dd9296a..b4c8c08 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -63,6 +63,46 @@ public class ReplChangeManager {
     COPY
   }
 
+  public static class FileInfo {
+    FileSystem fs;
+    Path sourcePath;
+    Path cmPath;
+    String checkSum;
+    boolean useSourcePath;
+    public FileInfo(FileSystem fs, Path sourcePath, Path cmPath, String 
checkSum, boolean useSourcePath) {
+      this.fs = fs;
+      this.sourcePath = sourcePath;
+      this.cmPath = cmPath;
+      this.checkSum = checkSum;
+      this.useSourcePath = useSourcePath;
+    }
+    public FileSystem getFs() {
+      return fs;
+    }
+    public Path getSourcePath() {
+      return sourcePath;
+    }
+    public Path getCmPath() {
+      return cmPath;
+    }
+    public String getCheckSum() {
+      return checkSum;
+    }
+    public boolean isUseSourcePath() {
+      return useSourcePath;
+    }
+    public void setIsUseSourcePath(boolean useSourcePath) {
+      this.useSourcePath = useSourcePath;
+    }
+    public Path getEffectivePath() {
+      if (useSourcePath) {
+        return sourcePath;
+      } else {
+        return cmPath;
+      }
+    }
+  }
+
   public static ReplChangeManager getInstance(Configuration conf) throws 
MetaException {
     if (instance == null) {
       instance = new ReplChangeManager(conf);
@@ -259,25 +299,25 @@ public class ReplChangeManager {
    * @param src Original file location
    * @param checksumString Checksum of the original file
    * @param conf
-   * @return Corresponding FileStatus object
+   * @return Corresponding FileInfo object
    */
-  static public FileStatus getFileStatus(Path src, String checksumString,
+  static public FileInfo getFileInfo(Path src, String checksumString,
       Configuration conf) throws MetaException {
     try {
       FileSystem srcFs = src.getFileSystem(conf);
       if (checksumString == null) {
-        return srcFs.getFileStatus(src);
+        return new FileInfo(srcFs, src, null, null, true);
       }
 
       if (!srcFs.exists(src)) {
-        return srcFs.getFileStatus(getCMPath(conf, src.getName(), 
checksumString));
+        return new FileInfo(srcFs, src, getCMPath(conf, src.getName(), 
checksumString), checksumString, false);
       }
 
       String currentChecksumString = checksumFor(src, srcFs);
       if (currentChecksumString == null || 
checksumString.equals(currentChecksumString)) {
-        return srcFs.getFileStatus(src);
+        return new FileInfo(srcFs, src, getCMPath(conf, src.getName(), 
checksumString), checksumString, true);
       } else {
-        return srcFs.getFileStatus(getCMPath(conf, src.getName(), 
checksumString));
+        return new FileInfo(srcFs, src, getCMPath(conf, src.getName(), 
checksumString), checksumString, false);
       }
     } catch (IOException e) {
       throw new MetaException(StringUtils.stringifyException(e));

Reply via email to