pkumarsinha commented on a change in pull request #1954:
URL: https://github.com/apache/hive/pull/1954#discussion_r581304947



##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -600,6 +600,11 @@ private static void populateLlapDaemonVarsSet(Set<String> 
llapDaemonVarsSetLocal
         "This is the fully qualified base directory on the target/replica 
warehouse under which data for "
             + "external tables is stored. This is relative base path and hence 
prefixed to the source "
             + "external table path on target cluster."),
+    
REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK("hive.repl.external.warehouse.single.copy.task",
+        false, "Should create single copy task for all the tables "

Review comment:
       "Should create single copy task for all the external tables"
   Add a word external before tables to avoid confusion
   
   

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -1375,6 +1376,205 @@ public void differentCatalogIncrementalReplication() 
throws Throwable {
     primary.run("drop database if exists " + sparkDbName + " cascade");
   }
 
+  @Test
+  public void testDatabaseLevelCopyLazy() throws Throwable {
+    testDatabaseLevelCopy(true);
+  }
+
+  @Test
+  public void testDatabaseLevelCopyAtSource() throws Throwable {
+    testDatabaseLevelCopy(false);
+  }
+
+  public void testDatabaseLevelCopy(boolean isAtTarget) throws Throwable {
+    Path externalTableLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"a/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    Path externalTablePartitionLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"part/");
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    List<String> withClause = Arrays.asList(
+        "'distcp.options.update'='','hive.repl.external.warehouse.single.copy"
+            + ".task'='true'",
+        "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname
+            + "'='" + isAtTarget + "'");
+
+    // Create a table within the warehouse location, one outside and one with
+    // a partition outside the default location.
+    WarehouseInstance.Tuple tuple =
+        primary.run("use " + primaryDbName)
+            .run("create external table a (i int, j int) "
+                + "row format delimited fields terminated by ',' "
+                + "location '" + externalTableLocation.toUri() + "'")
+            .run("insert into a values(1,2)")
+            .run("create external table b (id int)")
+            .run("insert into b values(5)")
+            .run("create external table c (place string) partitioned by 
(country "
+                + "string)")
+            .run("insert into table c partition(country='india') values "
+                + "('bangalore')")
+            .run("ALTER TABLE c ADD PARTITION (country='france') LOCATION '"
+                + externalTablePartitionLocation.toString() + "'")
+            .run("insert into c partition(country='france') values('paris')")

Review comment:
       After this line, can you please add assertions on table and partition 
locations

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import net.bytebuddy.implementation.bytecode.Throw;

Review comment:
       Revert this unused import

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -1375,6 +1376,205 @@ public void differentCatalogIncrementalReplication() 
throws Throwable {
     primary.run("drop database if exists " + sparkDbName + " cascade");
   }
 
+  @Test
+  public void testDatabaseLevelCopyLazy() throws Throwable {
+    testDatabaseLevelCopy(true);
+  }
+
+  @Test
+  public void testDatabaseLevelCopyAtSource() throws Throwable {
+    testDatabaseLevelCopy(false);
+  }
+
+  public void testDatabaseLevelCopy(boolean isAtTarget) throws Throwable {

Review comment:
       nit: isAtTarget -> runCopyTasksOnTarget ?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -149,10 +150,15 @@ void dataLocationDump(Table table, FileList fileList, 
HiveConf conf)
             "only External tables can be writen via this writer, provided 
table is " + table
                 .getTableType());
       }
-      Path fullyQualifiedDataLocation =
-          PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), 
FileSystem.get(hiveConf));
-      write(lineFor(table.getTableName(), fullyQualifiedDataLocation, 
hiveConf));
-      dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf);
+      Path fullyQualifiedDataLocation = PathBuilder

Review comment:
       nit: Can accommodate in a single line

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
##########
@@ -180,6 +195,14 @@ void dataLocationDump(Table table, FileList fileList, 
HiveConf conf)
       }
     }
 
+    void dbLocationDump(String dbName, Path dbLocation, FileList fileList,
+        HiveConf conf) throws Exception {
+      Path fullyQualifiedDataLocation = PathBuilder
+          .fullyQualifiedHDFSUri(dbLocation, FileSystem.get(hiveConf));
+      write(lineFor(dbName, fullyQualifiedDataLocation, hiveConf));

Review comment:
       lineFor assumes that the entry is for table name . We are passing here 
dbName. What are the side effect?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -1375,6 +1376,205 @@ public void differentCatalogIncrementalReplication() 
throws Throwable {
     primary.run("drop database if exists " + sparkDbName + " cascade");
   }
 
+  @Test
+  public void testDatabaseLevelCopyLazy() throws Throwable {
+    testDatabaseLevelCopy(true);
+  }
+
+  @Test
+  public void testDatabaseLevelCopyAtSource() throws Throwable {
+    testDatabaseLevelCopy(false);
+  }
+
+  public void testDatabaseLevelCopy(boolean isAtTarget) throws Throwable {
+    Path externalTableLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"a/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    Path externalTablePartitionLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"part/");
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    List<String> withClause = Arrays.asList(
+        "'distcp.options.update'='','hive.repl.external.warehouse.single.copy"
+            + ".task'='true'",
+        "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname
+            + "'='" + isAtTarget + "'");
+
+    // Create a table within the warehouse location, one outside and one with
+    // a partition outside the default location.
+    WarehouseInstance.Tuple tuple =
+        primary.run("use " + primaryDbName)
+            .run("create external table a (i int, j int) "
+                + "row format delimited fields terminated by ',' "
+                + "location '" + externalTableLocation.toUri() + "'")
+            .run("insert into a values(1,2)")
+            .run("create external table b (id int)")
+            .run("insert into b values(5)")
+            .run("create external table c (place string) partitioned by 
(country "
+                + "string)")
+            .run("insert into table c partition(country='india') values "
+                + "('bangalore')")
+            .run("ALTER TABLE c ADD PARTITION (country='france') LOCATION '"
+                + externalTablePartitionLocation.toString() + "'")
+            .run("insert into c partition(country='france') values('paris')")
+        .dump(primaryDbName, withClause);
+
+    // Do a load and verify all the data is there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("select i from a where j=2")
+        .verifyResult("1")
+        .run("select id from b")
+        .verifyResult("5")
+        .run("select place from c where country='india'")
+        .verifyResult("bangalore")
+        .run("select place from c where country='france'")
+        .verifyResult("paris");
+
+    // Check the task copied post bootstrap, It should have the database loc,
+    // the table 'a' since that is outside of the default location, and the
+    // 'c', since its partition is out of the default location.
+    assertExternalFileInfo(Arrays.asList(primaryDbName.toLowerCase(), "a", 
"c"),
+        tuple.dumpLocation, primaryDbName, false);
+
+    // Add more data to tables and do a incremental run and create another
+    // tables one inside and other outside default location.
+
+    externalTableLocation = new Path(
+        "/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"newout/");
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+    tuple =
+        primary.run("use " + primaryDbName)
+            .run("insert into a values(3,4)")
+            .run("insert into b values(6)")
+            .run("insert into table c partition(country='india') values "
+                + "('delhi')")
+            .run("insert into c partition(country='france') values('lyon')")
+            .run("create external table newin (id int)")
+            .run("insert into newin values(1)")
+            .run("create external table newout(id int) row format delimited "
+                + "fields terminated by ',' location '" + externalTableLocation
+                .toUri() + "'")
+            .run("insert into newout values(2)")

Review comment:
       After this line, can you please add assertions on table and partition 
locations

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -1375,6 +1376,205 @@ public void differentCatalogIncrementalReplication() 
throws Throwable {
     primary.run("drop database if exists " + sparkDbName + " cascade");
   }
 
+  @Test
+  public void testDatabaseLevelCopyLazy() throws Throwable {
+    testDatabaseLevelCopy(true);
+  }
+
+  @Test
+  public void testDatabaseLevelCopyAtSource() throws Throwable {
+    testDatabaseLevelCopy(false);
+  }
+
+  public void testDatabaseLevelCopy(boolean isAtTarget) throws Throwable {
+    Path externalTableLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"a/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    Path externalTablePartitionLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"part/");
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    List<String> withClause = Arrays.asList(
+        "'distcp.options.update'='','hive.repl.external.warehouse.single.copy"
+            + ".task'='true'",
+        "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname
+            + "'='" + isAtTarget + "'");
+
+    // Create a table within the warehouse location, one outside and one with
+    // a partition outside the default location.
+    WarehouseInstance.Tuple tuple =
+        primary.run("use " + primaryDbName)
+            .run("create external table a (i int, j int) "
+                + "row format delimited fields terminated by ',' "
+                + "location '" + externalTableLocation.toUri() + "'")
+            .run("insert into a values(1,2)")
+            .run("create external table b (id int)")
+            .run("insert into b values(5)")
+            .run("create external table c (place string) partitioned by 
(country "
+                + "string)")
+            .run("insert into table c partition(country='india') values "
+                + "('bangalore')")
+            .run("ALTER TABLE c ADD PARTITION (country='france') LOCATION '"
+                + externalTablePartitionLocation.toString() + "'")
+            .run("insert into c partition(country='france') values('paris')")
+        .dump(primaryDbName, withClause);
+
+    // Do a load and verify all the data is there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("select i from a where j=2")
+        .verifyResult("1")
+        .run("select id from b")
+        .verifyResult("5")
+        .run("select place from c where country='india'")
+        .verifyResult("bangalore")
+        .run("select place from c where country='france'")
+        .verifyResult("paris");
+
+    // Check the task copied post bootstrap, It should have the database loc,
+    // the table 'a' since that is outside of the default location, and the
+    // 'c', since its partition is out of the default location.
+    assertExternalFileInfo(Arrays.asList(primaryDbName.toLowerCase(), "a", 
"c"),

Review comment:
       What is the source path considered and is printed as a part of C here? 
Is it table location for C or the partition location for partition, i.e 
externalTablePartitionLocation

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
##########
@@ -1375,6 +1376,205 @@ public void differentCatalogIncrementalReplication() 
throws Throwable {
     primary.run("drop database if exists " + sparkDbName + " cascade");
   }
 
+  @Test
+  public void testDatabaseLevelCopyLazy() throws Throwable {
+    testDatabaseLevelCopy(true);
+  }
+
+  @Test
+  public void testDatabaseLevelCopyAtSource() throws Throwable {
+    testDatabaseLevelCopy(false);
+  }
+
+  public void testDatabaseLevelCopy(boolean isAtTarget) throws Throwable {
+    Path externalTableLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"a/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    Path externalTablePartitionLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + 
"part/");
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    List<String> withClause = Arrays.asList(
+        "'distcp.options.update'='','hive.repl.external.warehouse.single.copy"
+            + ".task'='true'",

Review comment:
       nit: accommodate the config in single line itself. Will improve 
readability




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to