This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 63bc962 HIVE-21403: Incorrect error code returned when retry
bootstrap with different dump (Sankar Hariappan, reviewed by Mahesh Kumar
Behera)
63bc962 is described below
commit 63bc9623f28045a5d57232ce410d6ff55b529d41
Author: Sankar Hariappan <[email protected]>
AuthorDate: Fri Mar 8 21:34:40 2019 +0530
HIVE-21403: Incorrect error code returned when retry bootstrap with
different dump (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
Signed-off-by: Sankar Hariappan <[email protected]>
---
.../TestReplicationScenariosAcrossInstances.java | 5 +-
.../TestReplicationScenariosExternalTables.java | 30 +++++
.../hadoop/hive/ql/parse/WarehouseInstance.java | 29 ++++-
.../ql/exec/repl/bootstrap/load/LoadDatabase.java | 42 +++----
.../repl/bootstrap/load/table/LoadPartitions.java | 62 +++++-----
.../exec/repl/bootstrap/load/table/LoadTable.java | 130 ++++++++++-----------
6 files changed, 168 insertions(+), 130 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 3d05db2..7528f27 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1237,9 +1237,8 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size());
// Retry with different dump should fail.
- CommandProcessorResponse ret = replica.runCommand("REPL LOAD " +
replicatedDbName +
- " FROM '" + tuple2.dumpLocation + "'");
- Assert.assertEquals(ret.getResponseCode(),
ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
+ replica.loadFailure(replicatedDbName, tuple2.dumpLocation, null,
+ ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
// Verify if create table is not called on table t1 but called for t2 and
t3.
// Also, allow constraint creation only on t1 and t3. Foreign key creation
on t2 fails.
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 83f38fa..a5d1032 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -548,6 +549,35 @@ public class TestReplicationScenariosExternalTables
extends BaseReplicationAcros
);
}
+ @Test
+ public void
retryIncBootstrapExternalTablesFromDifferentDumpWithoutCleanTablesConfig()
throws Throwable {
+ List<String> dumpWithClause = Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname +
"'='false'"
+ );
+ List<String> loadWithClause = externalTableBasePathWithClause();
+
+ WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary
+ .dump(primaryDbName, null, dumpWithClause);
+
+ replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation,
loadWithClause);
+
+ dumpWithClause = Arrays.asList("'" +
HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+ "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname +
"'='true'");
+ WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use "
+ primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("create table t2 as select * from t1")
+ .dump(primaryDbName,
tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause);
+ WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap
+ = primary.dump(primaryDbName,
tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause);
+
+ replica.load(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation,
loadWithClause);
+
+ // Re-bootstrapping from different bootstrap dump without clean tables
config should fail.
+ replica.loadFailure(replicatedDbName,
tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause,
+ ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
+ }
+
private void assertExternalFileInfo(List<String> expected, Path
externalTableInfoFile)
throws IOException {
DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem();
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 56eae91..c76d30c 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -243,6 +243,18 @@ public class WarehouseInstance implements Closeable {
return this;
}
+ WarehouseInstance runFailure(String command, int errorCode) throws Throwable
{
+ CommandProcessorResponse ret = driver.run(command);
+ if (ret.getException() == null) {
+ throw new RuntimeException("command execution passed for a invalid
command" + command);
+ }
+ if (ret.getResponseCode() != errorCode) {
+ throw new RuntimeException("Command: " + command + " returned incorrect
error code: "
+ + ret.getResponseCode() + " instead of " + errorCode);
+ }
+ return this;
+ }
+
Tuple dump(String dbName, String lastReplicationId, List<String>
withClauseOptions)
throws Throwable {
String dumpCommand =
@@ -288,7 +300,7 @@ public class WarehouseInstance implements Closeable {
WarehouseInstance load(String replicatedDbName, String dumpLocation,
List<String> withClauseOptions)
throws Throwable {
String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" +
dumpLocation + "'";
- if (!withClauseOptions.isEmpty()) {
+ if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") +
")";
}
run("EXPLAIN " + replLoadCmd);
@@ -303,26 +315,35 @@ public class WarehouseInstance implements Closeable {
WarehouseInstance status(String replicatedDbName, List<String>
withClauseOptions) throws Throwable {
String replStatusCmd = "REPL STATUS " + replicatedDbName;
- if (!withClauseOptions.isEmpty()) {
+ if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replStatusCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") +
")";
}
return run(replStatusCmd);
}
WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation)
throws Throwable {
- runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation +
"'");
+ loadFailure(replicatedDbName, dumpLocation, null);
return this;
}
WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation,
List<String> withClauseOptions)
throws Throwable {
String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" +
dumpLocation + "'";
- if (!withClauseOptions.isEmpty()) {
+ if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") +
")";
}
return runFailure(replLoadCmd);
}
+ WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation,
List<String> withClauseOptions,
+ int errorCode) throws Throwable {
+ String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" +
dumpLocation + "'";
+ if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
+ replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") +
")";
+ }
+ return runFailure(replLoadCmd, errorCode);
+ }
+
WarehouseInstance verifyResult(String data) throws IOException {
verifyResults(data == null ? new String[] {} : new String[] { data });
return this;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index c7828db..c892b40 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -60,30 +60,26 @@ public class LoadDatabase {
isTableLevelLoad = tblNameToLoadIn != null && !tblNameToLoadIn.isEmpty();
}
- public TaskTracker tasks() throws SemanticException {
- try {
- Database dbInMetadata = readDbMetadata();
- String dbName = dbInMetadata.getName();
- Task<? extends Serializable> dbRootTask = null;
- ReplLoadOpType loadDbType = getLoadDbType(dbName);
- switch (loadDbType) {
- case LOAD_NEW:
- dbRootTask = createDbTask(dbInMetadata);
- break;
- case LOAD_REPLACE:
- dbRootTask = alterDbTask(dbInMetadata);
- break;
- default:
- break;
- }
- if (dbRootTask != null) {
- dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata));
- tracker.addTask(dbRootTask);
- }
- return tracker;
- } catch (Exception e) {
- throw new SemanticException(e.getMessage(), e);
+ public TaskTracker tasks() throws Exception {
+ Database dbInMetadata = readDbMetadata();
+ String dbName = dbInMetadata.getName();
+ Task<? extends Serializable> dbRootTask = null;
+ ReplLoadOpType loadDbType = getLoadDbType(dbName);
+ switch (loadDbType) {
+ case LOAD_NEW:
+ dbRootTask = createDbTask(dbInMetadata);
+ break;
+ case LOAD_REPLACE:
+ dbRootTask = alterDbTask(dbInMetadata);
+ break;
+ default:
+ break;
+ }
+ if (dbRootTask != null) {
+ dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata));
+ tracker.addTask(dbRootTask);
}
+ return tracker;
}
Database readDbMetadata() throws SemanticException {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index fa72527..c1773c9 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -101,22 +101,35 @@ public class LoadPartitions {
this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc,
context.hiveDb);
}
- public TaskTracker tasks() throws SemanticException {
- try {
- /*
- We are doing this both in load table and load partitions
- */
- Database parentDb =
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
- LoadTable.TableLocationTuple tableLocationTuple =
- LoadTable.tableLocation(tableDesc, parentDb, tableContext, context);
- tableDesc.setLocation(tableLocationTuple.location);
-
- if (table == null) {
- //new table
- table = tableDesc.toTable(context.hiveConf);
- if (isPartitioned(tableDesc)) {
+ public TaskTracker tasks() throws Exception {
+ /*
+ We are doing this both in load table and load partitions
+ */
+ Database parentDb =
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+ LoadTable.TableLocationTuple tableLocationTuple =
+ LoadTable.tableLocation(tableDesc, parentDb, tableContext, context);
+ tableDesc.setLocation(tableLocationTuple.location);
+
+ if (table == null) {
+ //new table
+ table = tableDesc.toTable(context.hiveConf);
+ if (isPartitioned(tableDesc)) {
+ updateReplicationState(initialReplicationState());
+ if (!forNewTable().hasReplicationState()) {
+ // Add ReplStateLogTask only if no pending table load tasks left for
next cycle
+ Task<? extends Serializable> replLogTask
+ = ReplUtils.getTableReplLogTask(tableDesc, replLogger,
context.hiveConf);
+ tracker.addDependentTask(replLogTask);
+ }
+ return tracker;
+ }
+ } else {
+ // existing
+ if (table.isPartitioned()) {
+ List<AddPartitionDesc> partitionDescs =
event.partitionDescriptions(tableDesc);
+ if (!event.replicationSpec().isMetadataOnly() &&
!partitionDescs.isEmpty()) {
updateReplicationState(initialReplicationState());
- if (!forNewTable().hasReplicationState()) {
+ if
(!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
// Add ReplStateLogTask only if no pending table load tasks left
for next cycle
Task<? extends Serializable> replLogTask
= ReplUtils.getTableReplLogTask(tableDesc, replLogger,
context.hiveConf);
@@ -124,26 +137,9 @@ public class LoadPartitions {
}
return tracker;
}
- } else {
- // existing
- if (table.isPartitioned()) {
- List<AddPartitionDesc> partitionDescs =
event.partitionDescriptions(tableDesc);
- if (!event.replicationSpec().isMetadataOnly() &&
!partitionDescs.isEmpty()) {
- updateReplicationState(initialReplicationState());
- if
(!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
- // Add ReplStateLogTask only if no pending table load tasks left
for next cycle
- Task<? extends Serializable> replLogTask
- = ReplUtils.getTableReplLogTask(tableDesc, replLogger,
context.hiveConf);
- tracker.addDependentTask(replLogTask);
- }
- return tracker;
- }
- }
}
- return tracker;
- } catch (Exception e) {
- throw new SemanticException(e);
}
+ return tracker;
}
private void updateReplicationState(ReplicationState replicationState) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 0d1a88c..3b0b67a 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -84,83 +84,79 @@ public class LoadTable {
this.tracker = new TaskTracker(limiter);
}
- public TaskTracker tasks() throws SemanticException {
+ public TaskTracker tasks() throws Exception {
// Path being passed to us is a table dump location. We go ahead and load
it in as needed.
// If tblName is null, then we default to the table name specified in
_metadata, which is good.
// or are both specified, in which case, that's what we are intended to
create the new table as.
- try {
- if (event.shouldNotReplicate()) {
- return tracker;
- }
- String dbName = tableContext.dbNameToLoadIn; //this can never be null or
empty;
- // Create table associated with the import
- // Executed if relevant, and used to contain all the other details about
the table if not.
- ImportTableDesc tableDesc =
tableContext.overrideProperties(event.tableDesc(dbName));
- Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc,
context.hiveDb);
+ if (event.shouldNotReplicate()) {
+ return tracker;
+ }
+ String dbName = tableContext.dbNameToLoadIn; //this can never be null or
empty;
+ // Create table associated with the import
+ // Executed if relevant, and used to contain all the other details about
the table if not.
+ ImportTableDesc tableDesc =
tableContext.overrideProperties(event.tableDesc(dbName));
+ Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc,
context.hiveDb);
- // Normally, on import, trying to create a table or a partition in a db
that does not yet exist
- // is a error condition. However, in the case of a REPL LOAD, it is
possible that we are trying
- // to create tasks to create a table inside a db that as-of-now does not
exist, but there is
- // a precursor Task waiting that will create it before this is
encountered. Thus, we instantiate
- // defaults and do not error out in that case.
- // the above will change now since we are going to split replication
load in multiple execution
- // tasks and hence we could have created the database earlier in which
case the waitOnPrecursor will
- // be false and hence if db Not found we should error out.
- Database parentDb =
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
- if (parentDb == null) {
- if (!tableContext.waitOnPrecursor()) {
- throw new SemanticException(
-
ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName()));
- }
+ // Normally, on import, trying to create a table or a partition in a db
that does not yet exist
+ // is a error condition. However, in the case of a REPL LOAD, it is
possible that we are trying
+ // to create tasks to create a table inside a db that as-of-now does not
exist, but there is
+ // a precursor Task waiting that will create it before this is
encountered. Thus, we instantiate
+ // defaults and do not error out in that case.
+ // the above will change now since we are going to split replication load
in multiple execution
+ // tasks and hence we could have created the database earlier in which
case the waitOnPrecursor will
+ // be false and hence if db Not found we should error out.
+ Database parentDb =
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+ if (parentDb == null) {
+ if (!tableContext.waitOnPrecursor()) {
+ throw new SemanticException(
+ ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName()));
}
+ }
- Task<?> tblRootTask = null;
- ReplLoadOpType loadTblType = getLoadTableType(table);
- switch (loadTblType) {
- case LOAD_NEW:
- break;
- case LOAD_REPLACE:
- tblRootTask = dropTableTask(table);
- break;
- case LOAD_SKIP:
- return tracker;
- default:
- break;
- }
+ Task<?> tblRootTask = null;
+ ReplLoadOpType loadTblType = getLoadTableType(table);
+ switch (loadTblType) {
+ case LOAD_NEW:
+ break;
+ case LOAD_REPLACE:
+ tblRootTask = dropTableTask(table);
+ break;
+ case LOAD_SKIP:
+ return tracker;
+ default:
+ break;
+ }
- TableLocationTuple
- tableLocationTuple = tableLocation(tableDesc, parentDb,
tableContext, context);
- tableDesc.setLocation(tableLocationTuple.location);
+ TableLocationTuple
+ tableLocationTuple = tableLocation(tableDesc, parentDb, tableContext,
context);
+ tableDesc.setLocation(tableLocationTuple.location);
- /* Note: In the following section, Metadata-only import handling logic is
- interleaved with regular repl-import logic. The rule of thumb being
- followed here is that MD-only imports are essentially ALTERs. They do
- not load data, and should not be "creating" any metadata - they should
- be replacing instead. The only place it makes sense for a MD-only import
- to create is in the case of a table that's been dropped and recreated,
- or in the case of an unpartitioned table. In all other cases, it should
- behave like a noop or a pure MD alter.
- */
- newTableTasks(tableDesc, tblRootTask, tableLocationTuple);
+ /* Note: In the following section, Metadata-only import handling logic is
+ interleaved with regular repl-import logic. The rule of thumb being
+ followed here is that MD-only imports are essentially ALTERs. They do
+ not load data, and should not be "creating" any metadata - they should
+ be replacing instead. The only place it makes sense for a MD-only import
+ to create is in the case of a table that's been dropped and recreated,
+ or in the case of an unpartitioned table. In all other cases, it should
+ behave like a noop or a pure MD alter.
+ */
+ newTableTasks(tableDesc, tblRootTask, tableLocationTuple);
- // Set Checkpoint task as dependant to create table task. So, if same
dump is retried for
- // bootstrap, we skip current table update.
- Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
- tableDesc,
- null,
- context.dumpDirectory,
- context.hiveConf
- );
- if (!isPartitioned(tableDesc)) {
- Task<? extends Serializable> replLogTask
- = ReplUtils.getTableReplLogTask(tableDesc, replLogger,
context.hiveConf);
- ckptTask.addDependentTask(replLogTask);
- }
- tracker.addDependentTask(ckptTask);
- return tracker;
- } catch (Exception e) {
- throw new SemanticException(e);
+ // Set Checkpoint task as dependant to create table task. So, if same dump
is retried for
+ // bootstrap, we skip current table update.
+ Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
+ tableDesc,
+ null,
+ context.dumpDirectory,
+ context.hiveConf
+ );
+ if (!isPartitioned(tableDesc)) {
+ Task<? extends Serializable> replLogTask
+ = ReplUtils.getTableReplLogTask(tableDesc, replLogger,
context.hiveConf);
+ ckptTask.addDependentTask(replLogTask);
}
+ tracker.addDependentTask(ckptTask);
+ return tracker;
}
private ReplLoadOpType getLoadTableType(Table table) throws
InvalidOperationException, HiveException {