This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2093037cb49 HIVE-24933: Replication fails for transactional tables
having same name as dropped external tables. (Janardhan Hungund, reviewed by
Teddy Choi)
2093037cb49 is described below
commit 2093037cb49e0e824223042043a9b6ffc3aaebcd
Author: jhungund <[email protected]>
AuthorDate: Tue Sep 13 09:15:27 2022 +0530
HIVE-24933: Replication fails for transactional tables having same name as
dropped external tables. (Janardhan Hungund, reviewed by Teddy Choi)
Summary of the change:
While setting up the tasks during the repl-load phase of the replication,
delay the access to
table metadata until the task execution. This will avoid inconsistent
metadata access during
task creation.
Root Cause Analysis:
Background:
During the incremental load phase of replication, all event logs are
processed sequentially.
Multiple tasks are spawned/created during the processing of each event.
All the spawned tasks are subsequently, executed sequantially.
Scenario of the issue:
The issue is seen in the following scenario:
1. An external table(Eg. T1) created, is replicated, to target cluster
during earlier replication cycles.
2. This external table is dropped.
3. An new managed table with the same name (T1) is recreated.
4. The subsequent repl-load phase fails.
Root cause:
1. The above mentioned operations (table drop and recreation) are
propogated to the target cluster
via event logs during the subsequent incremental phase of replication.
2. We create DDL tasks to drop the external table for drop-table event.
3. We also create new tasks to create new managed tables.
4. Some additional events are logged which create tasks to load the newly
created table.
5. During the creation of these load-table tasks, we try to access metadata
corresponding to the new table.
During normal scenario of a fresh table creation, the metadata store
will not have data correspoding to the new table (yet to be created).
However, in this scenario, the old table still exists and hence, we end
up using the metadata corrsesponding to old table(external).
We try to use this metadata to create the load tasks for the new table.
During the exeuciton of these load tasks, which execute after the drop
and recreate tasks, we find that the metadata set in the
task context is stale and is inconsistent with the newly created table.
Hence, the error.
Fix:
Do not access the table metadata during the task creation phase for
table-load.
Instead, access the metadata during the task execution. By that time,
the metadata is updated to the latest state with the previously executed
tasks.
Change-Id: I79ed804617dcdadb51f961a933f4023ac0b6f509
---
.../parse/BaseReplicationScenariosAcidTables.java | 2 +-
.../parse/TestReplicationScenariosAcidTables.java | 82 ++++++++++++
.../org/apache/hadoop/hive/ql/exec/CopyTask.java | 21 +++-
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 49 +++++---
.../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 9 ++
.../java/org/apache/hadoop/hive/ql/exec/Task.java | 10 ++
.../hive/ql/parse/ImportSemanticAnalyzer.java | 138 ++++++++++-----------
.../apache/hadoop/hive/ql/plan/BaseCopyWork.java | 29 +++++
.../org/apache/hadoop/hive/ql/plan/CopyWork.java | 16 ++-
.../hadoop/hive/ql/plan/DeferredWorkContext.java | 68 ++++++++++
.../org/apache/hadoop/hive/ql/plan/MoveWork.java | 31 ++++-
.../apache/hadoop/hive/ql/plan/ReplCopyWork.java | 18 +++
12 files changed, 380 insertions(+), 93 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
index e1f3238486b..90aa944fe4d 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
@@ -117,7 +117,7 @@ public class BaseReplicationScenariosAcidTables {
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster,
overridesForHiveConf1);
}
- private static void setReplicaExternalBase(FileSystem fs, Map<String,
String> confMap) throws IOException {
+ protected static void setReplicaExternalBase(FileSystem fs, Map<String,
String> confMap) throws IOException {
fs.mkdirs(REPLICA_EXTERNAL_BASE);
fullyQualifiedReplicaExternalBase =
fs.getFileStatus(REPLICA_EXTERNAL_BASE).getPath().toString();
confMap.put(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname,
fullyQualifiedReplicaExternalBase);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 7fc55cf9805..7315f3565b3 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
@@ -130,6 +132,7 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
acidEnableConf.putAll(overrides);
+ setReplicaExternalBase(miniDFSCluster.getFileSystem(), acidEnableConf);
primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(),
primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
@@ -3408,4 +3411,83 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
.verifyResults(new String[] { "bangalore", "paris", "sydney" })
.verifyReplTargetProperty(replicatedDbName);
}
+
+
+ @Test
+ public void testTxnTblReplWithSameNameAsDroppedNonTxnTbl() throws Throwable {
+ List<String> withClauseOptions = new LinkedList<>();
+ withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname
+ + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+ String tbl = "t1";
+ primary
+ .run("use " + primaryDbName)
+ .run("create table " + tbl + " (id int)")
+ .run("insert into table " + tbl + " values (1)")
+ .dump(primaryDbName, withClauseOptions);
+
+ replica
+ .load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select id from " + tbl)
+ .verifyResults(new String[] {"1"});
+
+
assertFalse(AcidUtils.isTransactionalTable(replica.getTable(replicatedDbName,
tbl)));
+
+ primary
+ .run("use " + primaryDbName)
+ .run("drop table " + tbl)
+ .run("create table " + tbl + " (id int) clustered by(id) into 3
buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into table " + tbl + " values (2)")
+ .dump(primaryDbName, withClauseOptions);
+
+ replica
+ .load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select id from " + tbl)
+ .verifyResults(new String[] {"2"});
+
+
assertTrue(AcidUtils.isTransactionalTable(replica.getTable(replicatedDbName,
tbl)));
+ }
+
+ @Test
+ public void testTxnTblReplWithSameNameAsDroppedExtTbl() throws Throwable {
+ List<String> withClauseOptions = new LinkedList<>();
+ withClauseOptions.add("'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname
+ + "'='" + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+ String tbl = "t1";
+ primary
+ .run("use " + primaryDbName)
+ .run("create external table " + tbl + " (id int)")
+ .run("insert into table " + tbl + " values (1)")
+ .dump(primaryDbName, withClauseOptions);
+
+ replica
+ .load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select id from " + tbl)
+ .verifyResults(new String[] {"1"});
+
+
assertFalse(AcidUtils.isTransactionalTable(replica.getTable(replicatedDbName,
tbl)));
+ assertTrue(replica.getTable(replicatedDbName,
tbl).getTableType().equals(TableType.EXTERNAL_TABLE.toString()));
+
+ primary
+ .run("use " + primaryDbName)
+ .run("drop table " + tbl)
+ .run("create table " + tbl + " (id int) clustered by(id) into 3
buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into table " + tbl + " values (2)")
+ .dump(primaryDbName, withClauseOptions);
+
+ replica
+ .load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select id from " + tbl)
+ .verifyResults(new String[] {"2"});
+
+
assertTrue(AcidUtils.isTransactionalTable(replica.getTable(replicatedDbName,
tbl)));
+ assertFalse(replica.getTable(replicatedDbName,
tbl).getTableType().equals(TableType.EXTERNAL_TABLE.toString()));
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index 5b101df4ef2..1e56c46a8aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.util.StringUtils;
@@ -42,14 +43,32 @@ public class CopyTask extends Task<CopyWork> implements
Serializable {
@Override
public int execute() {
+ try {
+ initializeWorkFromDeferredContext();
+ } catch (Exception e) {
+ console.printError("Failed with exception " + e.getMessage(), "\n"
+ + StringUtils.stringifyException(e));
+ LOG.error("CopyTask failed", e);
+ setException(e);
+ return ReplUtils.handleException(work.isReplication(), e,
work.getDumpDirectory(), work.getMetricCollector(),
+ getName(), conf);
+ }
+
Path[] from = work.getFromPaths(), to = work.getToPaths();
for (int i = 0; i < from.length; ++i) {
int result = copyOnePath(from[i], to[i]);
- if (result != 0) return result;
+ if (result != 0)
+ return result;
}
return 0;
}
+ private void initializeWorkFromDeferredContext() throws HiveException {
+ if (null != getDeferredWorkContext()) {
+ work.initializeFromDeferredContext(getDeferredWorkContext());
+ }
+ }
+
protected int copyOnePath(Path fromPath, Path toPath) {
FileSystem dstFs = null;
try {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 250debfd9b7..eab5b4803ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -347,6 +347,12 @@ public class MoveTask extends Task<MoveWork> implements
Serializable {
@Override
public int execute() {
+ try {
+ initializeFromDeferredContext();
+ } catch (HiveException he) {
+ return processHiveException(he);
+ }
+
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Executing MoveWork " +
System.identityHashCode(work)
+ " with " + work.getLoadFileWork() + "; " + work.getLoadTableWork() +
"; "
@@ -514,23 +520,7 @@ public class MoveTask extends Task<MoveWork> implements
Serializable {
return 0;
} catch (HiveException he) {
- int errorCode = 1;
-
- if (he.getCanonicalErrorMsg() != ErrorMsg.GENERIC_ERROR) {
- errorCode = he.getCanonicalErrorMsg().getErrorCode();
- if (he.getCanonicalErrorMsg() == ErrorMsg.UNRESOLVED_RT_EXCEPTION) {
- console.printError("Failed with exception " + he.getMessage(), "\n"
- + StringUtils.stringifyException(he));
- } else {
- console.printError("Failed with exception " + he.getMessage()
- + "\nRemote Exception: " + he.getRemoteErrorMsg());
- console.printInfo("\n", StringUtils.stringifyException(he),false);
- }
- }
- setException(he);
- errorCode = ReplUtils.handleException(work.isReplication(), he,
work.getDumpDirectory(),
- work.getMetricCollector(),
getName(), conf);
- return errorCode;
+ return processHiveException(he);
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
@@ -541,6 +531,31 @@ public class MoveTask extends Task<MoveWork> implements
Serializable {
}
}
+ private int processHiveException(HiveException he) {
+ int errorCode = 1;
+
+ if (he.getCanonicalErrorMsg() != ErrorMsg.GENERIC_ERROR) {
+ errorCode = he.getCanonicalErrorMsg().getErrorCode();
+ if (he.getCanonicalErrorMsg() == ErrorMsg.UNRESOLVED_RT_EXCEPTION) {
+ console.printError("Failed with exception " + he.getMessage(), "\n"
+ + StringUtils.stringifyException(he));
+ } else {
+ console.printError("Failed with exception " + he.getMessage()
+ + "\nRemote Exception: " + he.getRemoteErrorMsg());
+ console.printInfo("\n", StringUtils.stringifyException(he),false);
+ }
+ }
+ setException(he);
+ errorCode = ReplUtils.handleException(work.isReplication(), he,
work.getDumpDirectory(),
+ work.getMetricCollector(), getName(), conf);
+ return errorCode;
+ }
+
+ private void initializeFromDeferredContext() throws HiveException {
+ if (null != getDeferredWorkContext()) {
+ work.initializeFromDeferredContext(getDeferredWorkContext());
+ }
+ }
public void logMessage(LoadTableDesc tbd) {
StringBuilder mesg = new StringBuilder("Loading data to table ")
.append( tbd.getTable().getTableName());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 231f57455b6..c12a9e43190 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -63,6 +64,8 @@ public class ReplCopyTask extends Task<ReplCopyWork>
implements Serializable {
Path toPath = null;
try {
+ initializeFromDeferredContext();
+
// Note: CopyWork supports copying multiple files, but ReplCopyWork
doesn't.
// Not clear of ReplCopyWork should inherit from CopyWork.
if (work.getFromPaths().length > 1 || work.getToPaths().length > 1) {
@@ -165,6 +168,12 @@ public class ReplCopyTask extends Task<ReplCopyWork>
implements Serializable {
}
}
+ private void initializeFromDeferredContext() throws HiveException {
+ if (null != getDeferredWorkContext()) {
+ work.initializeFromDeferredContext(getDeferredWorkContext());
+ }
+ }
+
private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs,
Path dataPath)
throws IOException {
Path fileListing = new Path(dataPath, EximUtil.FILES_NAME);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 19bb543c21f..35b6996637d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.DeferredWorkContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -68,6 +69,7 @@ public abstract class Task<T extends Serializable> implements
Serializable, Node
protected List<Task<?>> backupChildrenTasks = new ArrayList<Task<?>>();
protected static transient Logger LOG = LoggerFactory.getLogger(Task.class);
protected int taskTag;
+ protected DeferredWorkContext deferredWorkContext;
private boolean isLocalMode =false;
public static final int NO_TAG = 0;
@@ -637,6 +639,14 @@ public abstract class Task<T extends Serializable>
implements Serializable, Node
this.fetchSource = fetchSource;
}
+ public DeferredWorkContext getDeferredWorkContext() {
+ return deferredWorkContext;
+ }
+
+ public void setDeferredWorkContext(DeferredWorkContext deferredWorkContext) {
+ this.deferredWorkContext = deferredWorkContext;
+ }
+
@Override
public String toString() {
return getId() + ":" + getType();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 816153e2d80..d12a25b1990 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.QueryState;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.DeferredWorkContext;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.OutputFormat;
@@ -419,94 +421,86 @@ public class ImportSemanticAnalyzer extends
BaseSemanticAnalyzer {
return new ImportTableDesc(dbName, table);
}
- private static Task<?> loadTable(URI fromURI, Table table, boolean replace,
Path tgtPath,
+ private static Task<?> loadTable(URI fromURI, ImportTableDesc tblDesc,
boolean replace, Path tgtPath,
ReplicationSpec replicationSpec,
EximUtil.SemanticAnalyzerWrapperContext x,
Long writeId, int stmtId) throws
HiveException {
- return loadTable(fromURI, table, replace, tgtPath, replicationSpec, x,
writeId,stmtId, null, null);
+ return loadTable(fromURI, tblDesc, replace, tgtPath, replicationSpec, x,
writeId,stmtId, null, null);
}
- private static Task<?> loadTable(URI fromURI, Table table, boolean replace,
Path tgtPath,
- ReplicationSpec replicationSpec,
EximUtil.SemanticAnalyzerWrapperContext x,
- Long writeId, int stmtId,
- String dumpRoot, ReplicationMetricCollector
metricCollector) throws HiveException {
- assert table != null;
- assert table.getParameters() != null;
- Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
- Path destPath = null, loadPath = null;
- LoadFileType lft;
- boolean isSkipTrash = false;
- boolean needRecycle = false;
- if (replicationSpec.isInReplicationScope()) {
- isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
- if (table.isTemporary()) {
- needRecycle = false;
+ /*
+ * This API reads the table metadata and updates the deferred work context
object.
+ */
+ public static void setupDeferredContextFromMetadata(DeferredWorkContext
deferredContext) throws HiveException {
+
+ deferredContext.table =
ImportSemanticAnalyzer.tableIfExists(deferredContext.tblDesc,
deferredContext.hive);
+ if (deferredContext.table == null) {
+ deferredContext.table =
ImportSemanticAnalyzer.createNewTableMetadataObject(deferredContext.tblDesc,
true);
+ }
+
+ if (deferredContext.inReplScope) {
+ deferredContext.isSkipTrash =
MetaStoreUtils.isSkipTrash(deferredContext.table.getParameters());
+ if (deferredContext.table.isTemporary()) {
+ deferredContext.needRecycle = false;
} else {
- org.apache.hadoop.hive.metastore.api.Database db =
x.getHive().getDatabase(table.getDbName());
- needRecycle = db != null && ReplChangeManager.shouldEnableCm(db,
table.getTTable());
+ org.apache.hadoop.hive.metastore.api.Database db =
deferredContext.hive.getDatabase(deferredContext.table.getDbName());
+ deferredContext.needRecycle = db != null &&
ReplChangeManager.shouldEnableCm(db, deferredContext.table.getTTable());
}
}
- if (AcidUtils.isTransactionalTable(table)) {
- String mmSubdir = replace ? AcidUtils.baseDir(writeId)
- : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
- destPath = new Path(tgtPath, mmSubdir);
- /**
- * CopyTask below will copy files from the 'archive' to a delta_x_x in
the table/partition
- * directory, i.e. the final destination for these files. This has to
be a copy to preserve
- * the archive. MoveTask is optimized to do a 'rename' if files are on
the same FileSystem.
- * So setting 'loadPath' this way will make
- * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean,
boolean,
- * boolean, Long, int)}
- * skip the unnecessary file (rename) operation but it will perform
other things.
- */
- loadPath = tgtPath;
- lft = LoadFileType.KEEP_EXISTING;
+
+ if (AcidUtils.isTransactionalTable(deferredContext.table)) {
+ String mmSubdir = deferredContext.replace ?
AcidUtils.baseDir(deferredContext.writeId) :
+ AcidUtils.deltaSubdir(deferredContext.writeId,
deferredContext.writeId, deferredContext.stmtId);
+ deferredContext.destPath = new Path(deferredContext.tgtPath, mmSubdir);
+ /*
+ CopyTask will copy files from the 'archive' to a delta_x_x in the
table/partition
+ directory, i.e. the final destination for these files. This has to
be a copy to preserve
+ the archive. MoveTask is optimized to do a 'rename' if files are on
the same FileSystem.
+ So setting 'loadPath' this way will make
+ {@link Hive#loadTable(Path, String, LoadTableDesc.LoadFileType,
boolean, boolean, boolean,
+ boolean, Long, int, boolean, boolean)}
+ skip the unnecessary file (rename) operation but it will perform
other things.
+ */
+ deferredContext.loadPath = deferredContext.tgtPath;
+ deferredContext.loadFileType = LoadTableDesc.LoadFileType.KEEP_EXISTING;
} else {
- destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
- lft = replace ? LoadFileType.REPLACE_ALL :
- LoadFileType.OVERWRITE_EXISTING;
+ deferredContext.destPath = deferredContext.loadPath =
deferredContext.ctx.getExternalTmpPath(deferredContext.tgtPath);
+ deferredContext.loadFileType = deferredContext.replace ?
LoadTableDesc.LoadFileType.REPLACE_ALL :
LoadTableDesc.LoadFileType.OVERWRITE_EXISTING;
}
+ deferredContext.isCalculated = true;
+ }
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace("adding import work for table with source
location: " +
- dataPath + "; table: " + tgtPath + "; copy destination " +
destPath + "; mm " +
- writeId +
- " for " + table.getTableName() + ": " +
- (AcidUtils.isFullAcidTable(table) ? "acid" :
- (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat")
- )
- );
- }
+ private static Task<?> loadTable(URI fromURI, ImportTableDesc tblDesc,
boolean replace, Path tgtPath,
+ ReplicationSpec replicationSpec,
EximUtil.SemanticAnalyzerWrapperContext x,
+ Long writeId, int stmtId,
+ String dumpRoot, ReplicationMetricCollector
metricCollector) throws HiveException {
+
+
+ Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
+
+ DeferredWorkContext
+ resolver = new DeferredWorkContext(replace, tgtPath, writeId, stmtId,
x.getHive(), x.getCtx(), tblDesc,
+ replicationSpec.isInReplicationScope());
- Task<?> copyTask = null;
+ Task<?> copyTask;
+ // Corresponding work instances are not complete yet. Some of the values
will be calculated and assigned when task
+ // is being executed.
if (replicationSpec.isInReplicationScope()) {
boolean copyAtLoad =
x.getConf().getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
- copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath,
destPath, x.getConf(),
- isSkipTrash, needRecycle, copyAtLoad, dumpRoot, metricCollector);
+ copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, null,
x.getConf(),
+ false, false, copyAtLoad, dumpRoot, metricCollector);
} else {
- copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false,
dumpRoot, metricCollector, true));
+ copyTask = TaskFactory.get(new CopyWork(dataPath, null, false, dumpRoot,
metricCollector, true));
}
- MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null,
null, false,
- dumpRoot, metricCollector, true);
-
+ copyTask.setDeferredWorkContext(resolver);
- if (replicationSpec.isInReplicationScope() &&
AcidUtils.isTransactionalTable(table)) {
- LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
- Collections.singletonList(destPath),
- Collections.singletonList(tgtPath),
- true, null, null);
- moveWork.setMultiFilesDesc(loadFilesWork);
- moveWork.setNeedCleanTarget(replace);
- } else {
- LoadTableDesc loadTableWork = new LoadTableDesc(
- loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft,
writeId);
- loadTableWork.setStmtId(stmtId);
- moveWork.setLoadTableWork(loadTableWork);
- }
+ MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null,
null, false,
+ dumpRoot, metricCollector, true);
//if Importing into existing table, FileFormat is checked by
// ImportSemanticAnalyzer.checked checkTable()
Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
+ loadTableTask.setDeferredWorkContext(resolver);
copyTask.addDependentTask(loadTableTask);
x.getTasks().add(copyTask);
return loadTableTask;
@@ -1079,7 +1073,7 @@ public class ImportSemanticAnalyzer extends
BaseSemanticAnalyzer {
Path tgtPath = new Path(table.getDataLocation().toString());
FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
- loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId,
stmtId);
+ loadTable(fromURI, tblDesc, false, tgtPath, replicationSpec, x,
writeId, stmtId);
}
// Set this to read because we can't overwrite any existing partitions
x.getOutputs().add(new WriteEntity(table,
WriteEntity.WriteType.DDL_NO_LOCK));
@@ -1115,7 +1109,7 @@ public class ImportSemanticAnalyzer extends
BaseSemanticAnalyzer {
}
FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
checkTargetLocationEmpty(tgtFs, tablePath,
replicationSpec,x.getLOG());
- t.addDependentTask(loadTable(fromURI, table, false, tablePath,
replicationSpec, x,
+ t.addDependentTask(loadTable(fromURI, tblDesc, false, tablePath,
replicationSpec, x,
writeId, stmtId));
}
}
@@ -1123,7 +1117,7 @@ public class ImportSemanticAnalyzer extends
BaseSemanticAnalyzer {
}
}
- private static Table createNewTableMetadataObject(ImportTableDesc tblDesc,
boolean isRepl)
+ public static Table createNewTableMetadataObject(ImportTableDesc tblDesc,
boolean isRepl)
throws SemanticException {
Table newTable = new Table(tblDesc.getDatabaseName(),
tblDesc.getTableName());
//so that we know the type of table we are creating: acid/MM to match what
was exported
@@ -1267,7 +1261,7 @@ public class ImportSemanticAnalyzer extends
BaseSemanticAnalyzer {
} else if (!replicationSpec.isMetadataOnly()
&& !shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)) {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
- dependentTasks = Collections.singletonList(loadTable(fromURI, table,
replicationSpec.isReplace(),
+ dependentTasks = Collections.singletonList(loadTable(fromURI, tblDesc,
replicationSpec.isReplace(),
new Path(tblDesc.getLocation()), replicationSpec, x, writeId,
stmtId, dumpRoot, metricCollector));
}
@@ -1377,7 +1371,7 @@ public class ImportSemanticAnalyzer extends
BaseSemanticAnalyzer {
x.getLOG().debug("table non-partitioned");
if (!replicationSpec.isMetadataOnly()) {
// repl-imports are replace-into unless the event is insert-into
- loadTable(fromURI, table, replicationSpec.isReplace(), new
Path(tblDesc.getLocation()),
+ loadTable(fromURI, tblDesc, replicationSpec.isReplace(), new
Path(tblDesc.getLocation()),
replicationSpec, x, writeId, stmtId, dumpRoot, metricCollector);
} else {
x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec, true,
dumpRoot, metricCollector));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseCopyWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseCopyWork.java
new file mode 100644
index 00000000000..85b9b56709d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseCopyWork.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DeferredWorkContext;
+
+/*
+ * Interface for CopyWork, ReplCopyWork and MoveWork
+ */
+public interface BaseCopyWork {
+ public void initializeFromDeferredContext(DeferredWorkContext
deferredWorkContext) throws HiveException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index 2439cde4465..c48baf9af8c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.plan.DeferredWorkContext;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
/**
@@ -29,7 +32,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
*
*/
@Explain(displayName = "Copy", explainLevels = { Level.USER, Level.DEFAULT,
Level.EXTENDED })
-public class CopyWork implements Serializable {
+public class CopyWork implements Serializable, BaseCopyWork {
private static final long serialVersionUID = 1L;
private Path[] fromPath;
private Path[] toPath;
@@ -136,4 +139,15 @@ public class CopyWork implements Serializable {
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
+
+ public void setToPath(Path[] toPath) {
+ this.toPath = toPath;
+ }
+ public void initializeFromDeferredContext(DeferredWorkContext
deferredContext) throws HiveException {
+ if (!deferredContext.isCalculated()) {
+ // Read metadata from metastore and populate the members of the context
+ ImportSemanticAnalyzer.setupDeferredContextFromMetadata(deferredContext);
+ }
+ setToPath(new Path[] { deferredContext.destPath });
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/plan/DeferredWorkContext.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/DeferredWorkContext.java
new file mode 100644
index 00000000000..0b9d8c711dc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DeferredWorkContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+
+import java.util.Collections;
+import java.util.TreeMap;
+
+/*
+ * Context object that holds the information about the table and its
replication context.
+ */
+public class DeferredWorkContext {
+ public final boolean inReplScope;
+ public final boolean replace;
+ public final Long writeId;
+ public final int stmtId;
+ public final Hive hive;
+ public final Context ctx;
+ public final ImportTableDesc tblDesc;
+ public final Path tgtPath;
+ public Path destPath = null, loadPath = null;
+ public LoadTableDesc.LoadFileType loadFileType;
+ public boolean isSkipTrash;
+ public boolean needRecycle;
+ public boolean isCalculated = false;
+ public Table table;
+
+ public DeferredWorkContext(boolean replace, Path tgtPath, Long writeId, int
stmtId, Hive hive, Context ctx,
+ ImportTableDesc tblDesc, boolean inReplScope) {
+ this.replace = replace;
+ this.writeId = writeId;
+ this.stmtId = stmtId;
+ this.hive = hive;
+ this.ctx = ctx;
+ this.tblDesc = tblDesc;
+ this.inReplScope = inReplScope;
+ this.tgtPath = tgtPath;
+ }
+ public boolean isCalculated() {
+ return isCalculated;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index be0e2fb8be9..c589f2d7c7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -26,15 +26,23 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.plan.BaseCopyWork;
+import org.apache.hadoop.hive.ql.plan.DeferredWorkContext;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import java.util.Collections;
+import java.util.TreeMap;
+
/**
* MoveWork.
*
*/
@Explain(displayName = "Move Operator", explainLevels = { Level.USER,
Level.DEFAULT, Level.EXTENDED })
-public class MoveWork implements Serializable {
+public class MoveWork implements Serializable, BaseCopyWork {
private static final long serialVersionUID = 1L;
private LoadTableDesc loadTableWork;
private LoadFileDesc loadFileWork;
@@ -208,4 +216,25 @@ public class MoveWork implements Serializable {
public boolean getIsInReplicationScope() {
return this.isInReplicationScope;
}
+
+ public void initializeFromDeferredContext(DeferredWorkContext
deferredContext) throws HiveException {
+ if (!deferredContext.isCalculated()) {
+ // Read metadata from metastore and populate the members of the context
+ ImportSemanticAnalyzer.setupDeferredContextFromMetadata(deferredContext);
+ }
+
+ if (deferredContext.inReplScope &&
AcidUtils.isTransactionalTable(deferredContext.table)) {
+ LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+ Collections.singletonList(deferredContext.destPath),
+ Collections.singletonList(deferredContext.tgtPath),
+ true, null, null);
+ setMultiFilesDesc(loadFilesWork);
+ setNeedCleanTarget(deferredContext.replace);
+ } else {
+ LoadTableDesc loadTableWork = new LoadTableDesc(
+ deferredContext.loadPath,
Utilities.getTableDesc(deferredContext.table), new TreeMap<>(),
deferredContext.loadFileType, deferredContext.writeId);
+ loadTableWork.setStmtId(deferredContext.stmtId);
+ setLoadTableWork(loadTableWork);
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
index 3faed74bc64..83674dc0652 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -19,7 +19,10 @@
package org.apache.hadoop.hive.ql.plan;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DeferredWorkContext;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
/**
@@ -137,4 +140,19 @@ public class ReplCopyWork extends CopyWork {
public boolean isOverWrite() {
return overWrite;
}
+
+ public void initializeFromDeferredContext(DeferredWorkContext
deferredContext) throws HiveException {
+ if (!deferredContext.isCalculated()) {
+ // Read metadata from metastore and populate the members of the context
+ ImportSemanticAnalyzer.setupDeferredContextFromMetadata(deferredContext);
+ }
+
+ setToPath(new Path[] { deferredContext.destPath });
+ if (deferredContext.replace) {
+ setDeleteDestIfExist(true);
+ setAutoPurge(deferredContext.isSkipTrash);
+ setNeedRecycle(deferredContext.needRecycle);
+ }
+ }
+
}