This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1c9ca7b HIVE-26033: Repl Load fails with Wrong FS error. (#3100).
(Ayush Saxena, reviewed by Mahesh Kumar Behera)
1c9ca7b is described below
commit 1c9ca7be536842794fd1d6831f35492681cbecc8
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue Mar 22 11:09:01 2022 +0530
HIVE-26033: Repl Load fails with Wrong FS error. (#3100). (Ayush Saxena,
reviewed by Mahesh Kumar Behera)
---
.../TestReplicationScenariosExclusiveReplica.java | 52 ++++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 5 +--
.../hive/ql/exec/repl/util/SnapshotUtils.java | 18 ++++----
3 files changed, 63 insertions(+), 12 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
index 4e6a77f..449ec7a 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -39,6 +39,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -483,6 +484,57 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
}
@Test
+ public void testReplicationWithSnapshotsWithSourceStaging() throws Throwable
{
+ List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir, false);
+ withClauseOptions.add("'" +
HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY.varname + "'='" +
true + "'");
+ withClauseOptions.add("'" +
HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK.varname + "'='" +
true + "'");
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (500)")
+ .run("create table t2 (id int)")
+ .run("insert into table t2 values (600)")
+ .dump(primaryDbName, withClauseOptions);
+
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("select id from t1")
+ .verifyResult("500")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("select id from t2")
+ .verifyResult("600");
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("create external table t3 (id int)")
+ .run("insert into table t3 values (700)")
+ .run("create table t4 (id int)")
+ .run("insert into table t4 values (800)")
+ .dump(primaryDbName, withClauseOptions);
+
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("show tables like 't3'")
+ .verifyResult("t3")
+ .run("show tables like 't4'")
+ .verifyResult("t4")
+ .run("select id from t1")
+ .verifyResult("500")
+ .run("select id from t2")
+ .verifyResult("600")
+ .run("select id from t3")
+ .verifyResult("700")
+ .run("select id from t4")
+ .verifyResult("800");
+ }
+
+ @Test
public void externalTableReplicationDropDatabase() throws Throwable {
String primaryDb = "primarydb1";
String replicaDb = "repldb1";
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index e8692ef..f3b0a0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -406,9 +406,8 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
if (conf.getBoolVar(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY)) {
Path snapPath = SnapshotUtils.getSnapshotFileListPath(new
Path(work.dumpDirectory));
try {
- SnapshotUtils.getDFS(getExternalTableBaseDir(conf), conf)
- .rename(new Path(snapPath,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
- new Path(snapPath, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD),
Options.Rename.OVERWRITE);
+ SnapshotUtils.getDFS(snapPath, conf).rename(new Path(snapPath,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
+ new Path(snapPath, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD),
Options.Rename.OVERWRITE);
} catch (FileNotFoundException fnf) {
// Ignore if no file.
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
index 4e539f9..5a0cf0c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java
@@ -275,15 +275,16 @@ public class SnapshotUtils {
/**
* Deletes the snapshots present in the list.
- * @param dfs DistributedFileSystem.
* @param diffList Elements to be deleted.
* @param prefix Prefix used in snapshot names,
* @param snapshotCount snapshot counter to track the number of snapshots
deleted.
* @param conf the Hive Configuration.
* @throws IOException in case of any error.
*/
- private static void cleanUpSnapshots(DistributedFileSystem dfs,
ArrayList<String> diffList, String prefix,
- ReplSnapshotCount snapshotCount, HiveConf conf) throws IOException {
+ private static void cleanUpSnapshots(ArrayList<String> diffList, String
prefix, ReplSnapshotCount snapshotCount,
+ HiveConf conf) throws IOException {
+ DistributedFileSystem dfs =
+ diffList.size() > 0 ? (DistributedFileSystem) new
Path(diffList.get(0)).getFileSystem(conf) : null;
for (String path : diffList) {
Path snapshotPath = new Path(path);
boolean isFirstDeleted = deleteSnapshotIfExists(dfs, snapshotPath,
firstSnapshot(prefix), conf);
@@ -332,25 +333,24 @@ public class SnapshotUtils {
*/
public static void cleanupSnapshots(Path dumpRoot, String snapshotPrefix,
HiveConf conf,
ReplSnapshotCount snapshotCount, boolean isLoad) throws IOException,
SemanticException {
- DistributedFileSystem dfs = (DistributedFileSystem)
dumpRoot.getFileSystem(conf);
- if (dfs.exists(new Path(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD))) {
+ DistributedFileSystem dumpDfs = (DistributedFileSystem)
dumpRoot.getFileSystem(conf);
+ if (dumpDfs.exists(new Path(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD))) {
FileList snapOld = createTableFileList(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, conf);
FileList snapNew = createTableFileList(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf);
ArrayList<String> oldPaths = SnapshotUtils.getListFromFileList(snapOld);
ArrayList<String> newPaths = SnapshotUtils.getListFromFileList(snapNew);
ArrayList<String> diffList = SnapshotUtils.getDiffList(newPaths,
oldPaths, conf, isLoad);
- dfs = isLoad ? (DistributedFileSystem)
getExternalTableBaseDir(conf).getFileSystem(conf) : dfs;
- SnapshotUtils.cleanUpSnapshots(dfs, diffList, snapshotPrefix,
snapshotCount, conf);
+ SnapshotUtils.cleanUpSnapshots(diffList, snapshotPrefix, snapshotCount,
conf);
}
if (isLoad) {
try {
- dfs.delete((new Path(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD)), true);
+ dumpDfs.delete((new Path(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD)), true);
} catch (FileNotFoundException fnf) {
// ignore
LOG.warn("Failed to clean up snapshot " +
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, fnf);
}
try {
- dfs.rename(new Path(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
+ dumpDfs.rename(new Path(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT),
new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD),
Options.Rename.OVERWRITE);
} catch (FileNotFoundException fnf) {
// ignore