This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c9ca7b  HIVE-26033: Repl Load fails with Wrong FS error. (#3100). 
(Ayush Saxena, reviewed by Mahesh Kumar Behera)
1c9ca7b is described below

commit 1c9ca7be536842794fd1d6831f35492681cbecc8
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue Mar 22 11:09:01 2022 +0530

    HIVE-26033: Repl Load fails with Wrong FS error. (#3100). (Ayush Saxena, 
reviewed by Mahesh Kumar Behera)
---
 .../TestReplicationScenariosExclusiveReplica.java  | 52 ++++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  5 +--
 .../hive/ql/exec/repl/util/SnapshotUtils.java      | 18 ++++----
 3 files changed, 63 insertions(+), 12 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
index 4e6a77f..449ec7a 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -39,6 +39,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -483,6 +484,57 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
   }
 
   @Test
+  public void testReplicationWithSnapshotsWithSourceStaging() throws Throwable 
{
+    List<String> withClauseOptions = 
getStagingLocationConfig(primary.repldDir, false);
+    withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY.varname + "'='" + 
true + "'");
+    withClauseOptions.add("'" + 
HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname + "'='" + 
true + "'");
+    WarehouseInstance.Tuple tuple = primary
+        .run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (500)")
+        .run("create table t2 (id int)")
+        .run("insert into table t2 values (600)")
+        .dump(primaryDbName, withClauseOptions);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("select id from t1")
+        .verifyResult("500")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("select id from t2")
+        .verifyResult("600");
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("create external table t3 (id int)")
+        .run("insert into table t3 values (700)")
+        .run("create table t4 (id int)")
+        .run("insert into table t4 values (800)")
+        .dump(primaryDbName, withClauseOptions);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("show tables like 't4'")
+        .verifyResult("t4")
+        .run("select id from t1")
+        .verifyResult("500")
+        .run("select id from t2")
+        .verifyResult("600")
+        .run("select id from t3")
+        .verifyResult("700")
+        .run("select id from t4")
+        .verifyResult("800");
+  }
+
+  @Test
   public void externalTableReplicationDropDatabase() throws Throwable {
     String primaryDb = "primarydb1";
     String replicaDb = "repldb1";
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index e8692ef..f3b0a0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -406,9 +406,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
     if (conf.getBoolVar(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY)) {
       Path snapPath = SnapshotUtils.getSnapshotFileListPath(new 
Path(work.dumpDirectory));
       try {
-        SnapshotUtils.getDFS(getExternalTableBaseDir(conf), conf)
-            .rename(new Path(snapPath, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
-                new Path(snapPath, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD), 
Options.Rename.OVERWRITE);
+        SnapshotUtils.getDFS(snapPath, conf).rename(new Path(snapPath, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
+            new Path(snapPath, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD), 
Options.Rename.OVERWRITE);
       } catch (FileNotFoundException fnf) {
         // Ignore if no file.
       }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
index 4e539f9..5a0cf0c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
@@ -275,15 +275,16 @@ public class SnapshotUtils {
 
   /**
    *  Deletes the snapshots present in the list.
-   * @param dfs DistributedFileSystem.
    * @param diffList Elements to be deleted.
    * @param prefix Prefix used in snapshot names,
    * @param snapshotCount snapshot counter to track the number of snapshots 
deleted.
    * @param conf the Hive Configuration.
    * @throws IOException in case of any error.
    */
-  private static void cleanUpSnapshots(DistributedFileSystem dfs, 
ArrayList<String> diffList, String prefix,
-      ReplSnapshotCount snapshotCount, HiveConf conf) throws IOException {
+  private static void cleanUpSnapshots(ArrayList<String> diffList, String 
prefix, ReplSnapshotCount snapshotCount,
+      HiveConf conf) throws IOException {
+    DistributedFileSystem dfs =
+        diffList.size() > 0 ? (DistributedFileSystem) new 
Path(diffList.get(0)).getFileSystem(conf) : null;
     for (String path : diffList) {
       Path snapshotPath = new Path(path);
       boolean isFirstDeleted = deleteSnapshotIfExists(dfs, snapshotPath, 
firstSnapshot(prefix), conf);
@@ -332,25 +333,24 @@ public class SnapshotUtils {
    */
   public static void cleanupSnapshots(Path dumpRoot, String snapshotPrefix, 
HiveConf conf,
       ReplSnapshotCount snapshotCount, boolean isLoad) throws IOException, 
SemanticException {
-    DistributedFileSystem dfs = (DistributedFileSystem) 
dumpRoot.getFileSystem(conf);
-    if (dfs.exists(new Path(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD))) {
+    DistributedFileSystem dumpDfs = (DistributedFileSystem) 
dumpRoot.getFileSystem(conf);
+    if (dumpDfs.exists(new Path(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD))) {
       FileList snapOld = createTableFileList(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, conf);
       FileList snapNew = createTableFileList(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf);
       ArrayList<String> oldPaths = SnapshotUtils.getListFromFileList(snapOld);
       ArrayList<String> newPaths = SnapshotUtils.getListFromFileList(snapNew);
       ArrayList<String> diffList = SnapshotUtils.getDiffList(newPaths, 
oldPaths, conf, isLoad);
-      dfs = isLoad ? (DistributedFileSystem) 
getExternalTableBaseDir(conf).getFileSystem(conf) : dfs;
-      SnapshotUtils.cleanUpSnapshots(dfs, diffList, snapshotPrefix, 
snapshotCount, conf);
+      SnapshotUtils.cleanUpSnapshots(diffList, snapshotPrefix, snapshotCount, 
conf);
     }
     if (isLoad) {
       try {
-        dfs.delete((new Path(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD)), true);
+        dumpDfs.delete((new Path(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD)), true);
       } catch (FileNotFoundException fnf) {
         // ignore
         LOG.warn("Failed to clean up snapshot " + 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, fnf);
       }
       try {
-        dfs.rename(new Path(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
+        dumpDfs.rename(new Path(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
             new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD), 
Options.Rename.OVERWRITE);
       } catch (FileNotFoundException fnf) {
         // ignore

Reply via email to