This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7416fac HIVE-21763: Incremental replication to allow changing
include/exclude tables list in replication policy (Sankar Hariappan, reviewed
by Mahesh Kumar Behera)
7416fac is described below
commit 7416fac24d5327db6e8de91317d9ad8a4ce69017
Author: Sankar Hariappan <[email protected]>
AuthorDate: Wed Jun 19 08:46:25 2019 +0530
HIVE-21763: Incremental replication to allow changing include/exclude
tables list in replication policy (Sankar Hariappan, reviewed by Mahesh Kumar
Behera)
Signed-off-by: Sankar Hariappan <[email protected]>
---
.../hive/ql/parse/TestReplicationScenarios.java | 2 +-
.../parse/TestTableLevelReplicationScenarios.java | 240 ++++++++++++++++++++-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 14 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 101 ++++++---
.../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 4 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 36 ++++
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 15 +-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 5 +
.../org/apache/hadoop/hive/ql/parse/HiveParser.g | 22 +-
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 49 ++++-
.../hive/ql/parse/repl/dump/TableExport.java | 2 +-
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 41 ++--
.../events/AbstractConstraintEventHandler.java | 1 +
.../repl/dump/events/AddPartitionHandler.java | 3 +-
.../repl/dump/events/AllocWriteIdHandler.java | 9 +
.../repl/dump/events/AlterPartitionHandler.java | 3 +-
.../parse/repl/dump/events/AlterTableHandler.java | 3 +-
.../parse/repl/dump/events/CommitTxnHandler.java | 8 +-
.../parse/repl/dump/events/CreateTableHandler.java | 3 +-
.../ql/parse/repl/dump/events/EventHandler.java | 5 +-
.../ql/parse/repl/dump/events/InsertHandler.java | 3 +-
.../repl/dump/events/UpdatePartColStatHandler.java | 4 +-
.../dump/events/UpdateTableColStatHandler.java | 3 +-
.../ql/parse/repl/dump/io/TableSerializer.java | 2 +-
.../hive/ql/parse/repl/load/DumpMetaData.java | 116 ++++++++--
.../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 2 +-
.../apache/hadoop/hive/common/repl/ReplScope.java | 26 ++-
27 files changed, 624 insertions(+), 98 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 1f41d46..0bc7bb3 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -395,7 +395,7 @@ public class TestReplicationScenarios {
HiveConf confTemp = new HiveConf();
confTemp.set("hive.repl.enable.move.optimization", "true");
ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation,
replicadb,
- null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
+ null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
Collections.emptyList());
Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
replLoadTask.initialize(null, null, new
DriverContext(driver.getContext()), null);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 7c1d010..f67fc81 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.parse;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -37,6 +38,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
+import static
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
/**
* Tests Table level replication scenarios.
@@ -115,6 +117,14 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
List<String> dumpWithClause,
List<String> loadWithClause,
String[] expectedTables) throws Throwable {
+ return replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause,
loadWithClause, null, expectedTables);
+ }
+
+ private String replicateAndVerify(String replPolicy, String oldReplPolicy,
String lastReplId,
+ List<String> dumpWithClause,
+ List<String> loadWithClause,
+ String[] bootstrappedTables,
+ String[] expectedTables) throws Throwable {
if (dumpWithClause == null) {
dumpWithClause = new ArrayList<>();
}
@@ -126,8 +136,11 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
if (lastReplId == null) {
replica.run("drop database if exists " + replicatedDbName + " cascade");
}
- WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
- .dump(replPolicy, lastReplId, dumpWithClause);
+ WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy,
lastReplId, dumpWithClause);
+
+ if (oldReplPolicy != null) {
+ verifyBootstrapDirInIncrementalDump(tuple.dumpLocation,
bootstrappedTables);
+ }
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
@@ -136,6 +149,31 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
return tuple.lastReplicationId;
}
+ private void verifyBootstrapDirInIncrementalDump(String dumpLocation,
String[] bootstrappedTables)
+ throws Throwable {
+ // _bootstrap directory should be created as bootstrap enabled on external
tables.
+ Path dumpPath = new Path(dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME);
+
+ // If nothing to be bootstrapped.
+ if (bootstrappedTables.length == 0) {
+
Assert.assertFalse(primary.miniDFSCluster.getFileSystem().exists(dumpPath));
+ return;
+ }
+
+ Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath));
+
+ // Check if the DB dump path have any tables other than the ones listed in
bootstrappedTables.
+ Path dbPath = new Path(dumpPath, primaryDbName);
+ FileStatus[] fileStatuses =
primary.miniDFSCluster.getFileSystem().listStatus(dbPath);
+ Assert.assertEquals(fileStatuses.length, bootstrappedTables.length);
+
+ // Eg: _bootstrap/<db_name>/t2, _bootstrap/<db_name>/t3 etc
+ for (String tableName : bootstrappedTables) {
+ Path tblPath = new Path(dbPath, tableName);
+
Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
+ }
+ }
+
@Test
public void testBasicBootstrapWithIncludeList() throws Throwable {
String[] originalNonAcidTables = new String[] {"t1", "t2" };
@@ -197,7 +235,7 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
}
@Test
- public void testReplDumpWithIncorrectTablePolicy() throws Throwable {
+ public void testIncorrectTablePolicyInReplDump() throws Throwable {
String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111" };
createTables(originalTables, CreateTableType.NON_ACID);
@@ -224,8 +262,39 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
Assert.assertTrue(failed);
}
+ // Test incremental replication with invalid replication policies in
REPLACE clause.
+ String replPolicy = primaryDbName;
+ WarehouseInstance.Tuple tupleBootstrap = primary.run("use " +
primaryDbName)
+ .dump(primaryDbName, null);
+ replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+ String lastReplId = tupleBootstrap.lastReplicationId;
+ for (String oldReplPolicy : invalidReplPolicies) {
+ failed = false;
+ try {
+ replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null,
null, replicatedTables);
+ } catch (Exception ex) {
+ LOG.info("Got exception: {}", ex.getMessage());
+ Assert.assertTrue(ex instanceof ParseException);
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+ }
+
+ // Replace with replication policy having different DB name.
+ String oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + "_dupe.['t1+'].['t1']";
+ failed = false;
+ try {
+ replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null,
null, replicatedTables);
+ } catch (Exception ex) {
+ LOG.info("Got exception: {}", ex.getMessage());
+ Assert.assertTrue(ex instanceof SemanticException);
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+
// Invalid pattern where we didn't enclose table pattern within single or
double quotes.
- String replPolicy = primaryDbName + ".[t1].[t2]";
+ replPolicy = primaryDbName + ".[t1].[t2]";
failed = false;
try {
replicateAndVerify(replPolicy, null, null, null, replicatedTables);
@@ -263,7 +332,14 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
// Replicate and verify if 2 tables are replicated as per policy.
String replPolicy = primaryDbName.toUpperCase() + ".['.*a1+', 'cc3',
'B2'].['AA1+', 'b2']";
String[] replicatedTables = new String[] {"a1", "cc3" };
- replicateAndVerify(replPolicy, null, null, null, replicatedTables);
+ String lastReplId = replicateAndVerify(replPolicy, null, null, null,
replicatedTables);
+
+ // Test case insensitive nature in REPLACE clause as well.
+ String oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + ".['.*a1+', 'cc3', 'B2'].['AA1+']";
+ replicatedTables = new String[] {"a1", "b2", "cc3" };
+ String[] bootstrappedTables = new String[] {"b2" };
+ replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null,
bootstrappedTables, replicatedTables);
}
@Test
@@ -335,7 +411,8 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
);
String replPolicy = primaryDbName + ".['a[0-9]+', 'b2'].['a1']";
String[] bootstrapReplicatedTables = new String[] {"b2" };
- String lastReplId = replicateAndVerify(replPolicy, null, dumpWithClause,
loadWithClause, bootstrapReplicatedTables);
+ String lastReplId = replicateAndVerify(replPolicy, null,
+ dumpWithClause, loadWithClause, bootstrapReplicatedTables);
// Enable external tables replication and bootstrap in incremental phase.
String[] incrementalReplicatedTables = new String[] {"a2", "b2" };
@@ -357,4 +434,155 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
.run("show tables")
.verifyResults(incrementalReplicatedTables);
}
+
+ @Test
+ public void testBasicReplaceReplPolicy() throws Throwable {
+ String[] originalNonAcidTables = new String[] {"t1", "t2" };
+ String[] originalFullAcidTables = new String[] {"t3", "t4" };
+ String[] originalMMAcidTables = new String[] {"t5" };
+ createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+ createTables(originalFullAcidTables, CreateTableType.FULL_ACID);
+ createTables(originalMMAcidTables, CreateTableType.MM_ACID);
+
+ // Replicate and verify if only 2 tables are replicated to target.
+ String replPolicy = primaryDbName + ".['t1', 't4']";
+ String oldReplPolicy = null;
+ String[] replicatedTables = new String[] {"t1", "t4" };
+ String lastReplId = replicateAndVerify(replPolicy, null, null, null,
replicatedTables);
+
+ // Exclude t4 and include t3, t6
+ createTables(new String[] {"t6" }, CreateTableType.MM_ACID);
+ oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + ".['t1', 't3', 't6']";
+ replicatedTables = new String[] {"t1", "t3", "t6" };
+ String[] bootstrappedTables = new String[] {"t3", "t6" };
+ lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+ null, null, bootstrappedTables, replicatedTables);
+
+ // Convert to Full Db repl policy. All tables should be included.
+ oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName;
+ replicatedTables = new String[] {"t1", "t2", "t3", "t4", "t5", "t6" };
+ bootstrappedTables = new String[] {"t2", "t4", "t5" };
+ replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+ null, null, bootstrappedTables, replicatedTables);
+
+ // Convert to regex that excludes t3, t4 and t5.
+ oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + ".['.*?'].['t[3-5]+']";
+ replicatedTables = new String[] {"t1", "t2", "t6" };
+ bootstrappedTables = new String[]{};
+ replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+ null, null, bootstrappedTables, replicatedTables);
+ }
+
+ @Test
+ public void testReplacePolicyOnBootstrapAcidTablesIncrementalPhase() throws
Throwable {
+ String[] originalNonAcidTables = new String[] {"a1", "b1", "c1" };
+ String[] originalFullAcidTables = new String[] {"a2", "b2" };
+ String[] originalMMAcidTables = new String[] {"a3", "a4" };
+ createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+ createTables(originalFullAcidTables, CreateTableType.FULL_ACID);
+ createTables(originalMMAcidTables, CreateTableType.MM_ACID);
+
+ // Replicate and verify if only non-acid tables are replicated to target.
+ List<String> dumpWithoutAcidClause = Collections.singletonList(
+ "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
+ String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['b1']";
+ String[] bootstrapReplicatedTables = new String[] {"a1" };
+ String lastReplId = replicateAndVerify(replPolicy, null,
+ dumpWithoutAcidClause, null, bootstrapReplicatedTables);
+
+ // Enable acid tables for replication. Also, replace, replication policy
to exclude "b1" and "a3"
+ // instead of "a1" alone.
+ String oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + ".['[a-z]+[0-9]+'].['a3', 'b1']";
+ List<String> dumpWithAcidBootstrapClause = Arrays.asList(
+ "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
+ "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+ String[] incrementalReplicatedTables = new String[] {"a1", "a2", "a4",
"b2", "c1" };
+ String[] bootstrappedTables = new String[] {"a2", "a4", "b2", "c1" };
+ replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+ dumpWithAcidBootstrapClause, null, bootstrappedTables,
incrementalReplicatedTables);
+ }
+
+ @Test
+ public void testReplacePolicyWhenAcidTablesDisabledForRepl() throws
Throwable {
+ String[] originalNonAcidTables = new String[] {"a1", "b1", "c1" };
+ String[] originalFullAcidTables = new String[] {"a2" };
+ createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+ createTables(originalFullAcidTables, CreateTableType.FULL_ACID);
+
+ // Replicate and verify if only non-acid tables are replicated to target.
+ List<String> dumpWithoutAcidClause = Collections.singletonList(
+ "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
+ String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['b1']";
+ String[] bootstrapReplicatedTables = new String[] {"a1" };
+ String lastReplId = replicateAndVerify(replPolicy, null,
+ dumpWithoutAcidClause, null, bootstrapReplicatedTables);
+
+ // Continue to disable ACID tables for replication. Also, replace,
replication policy to include
+ // "a2" but exclude "a1" and "b1". Still ACID tables shouldn't be
bootstrapped. Only non-ACID
+ // table "b1" should be bootstrapped.
+ String oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a2']";
+ String[] incrementalReplicatedTables = new String[] {"a1", "b1" };
+ String[] bootstrappedTables = new String[] {"b1" };
+ lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+ dumpWithoutAcidClause, null, bootstrappedTables,
incrementalReplicatedTables);
+ }
+
+ @Test
+ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase()
throws Throwable {
+ String[] originalAcidTables = new String[] {"a1", "b1" };
+ String[] originalExternalTables = new String[] {"a2", "b2", "c2" };
+ createTables(originalAcidTables, CreateTableType.FULL_ACID);
+ createTables(originalExternalTables, CreateTableType.EXTERNAL);
+
+ // Bootstrap should exclude external tables.
+ List<String> loadWithClause =
ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE,
replica);
+ List<String> dumpWithClause = Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname +
"'='false'"
+ );
+ String replPolicy = primaryDbName + ".['a[0-9]+', 'b1'].['a1']";
+ String[] bootstrapReplicatedTables = new String[] {"b1" };
+ String lastReplId = replicateAndVerify(replPolicy, null,
+ dumpWithClause, loadWithClause, bootstrapReplicatedTables);
+
+ // Continue to disable external tables for replication. Also, replace,
replication policy to exclude
+ // "b1" and include "a1".
+ String oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a2', 'b1']";
+ String[] incrementalReplicatedTables = new String[] {"a1" };
+ String[] bootstrappedTables = new String[] {"a1" };
+ lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+ dumpWithClause, loadWithClause, bootstrappedTables,
incrementalReplicatedTables);
+
+ // Enable external tables replication and bootstrap in incremental phase.
Also, replace,
+ // replication policy to exclude tables with prefix "b".
+ oldReplPolicy = replPolicy;
+ replPolicy = primaryDbName + ".['[a-z]+[0-9]+'].['b[0-9]+']";
+ incrementalReplicatedTables = new String[] {"a1", "a2", "c2" };
+ bootstrappedTables = new String[] {"a2", "c2" };
+ dumpWithClause = Arrays.asList("'" +
HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+ "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname +
"'='true'");
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .dump(replPolicy, oldReplPolicy, lastReplId, dumpWithClause);
+
+ // the _external_tables_file info should be created as external tables are
to be replicated.
+ Assert.assertTrue(primary.miniDFSCluster.getFileSystem()
+ .exists(new Path(tuple.dumpLocation, FILE_NAME)));
+
+ // Verify that the external table info contains table "a2" and "c2".
+ ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2",
"c2"),
+ new Path(tuple.dumpLocation, FILE_NAME));
+
+ // Verify if the expected tables are bootstrapped.
+ verifyBootstrapDirInIncrementalDump(tuple.dumpLocation,
bootstrappedTables);
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(incrementalReplicatedTables);
+ }
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index cdf9071..6326bc3 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -265,6 +265,18 @@ public class WarehouseInstance implements Closeable {
return dump(dumpCommand);
}
+ Tuple dump(String replPolicy, String oldReplPolicy, String
lastReplicationId, List<String> withClauseOptions)
+ throws Throwable {
+ String dumpCommand =
+ "REPL DUMP " + replPolicy
+ + (oldReplPolicy == null ? "" : " REPLACE " +
oldReplPolicy)
+ + (lastReplicationId == null ? "" : " FROM " +
lastReplicationId);
+ if (!withClauseOptions.isEmpty()) {
+ dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") +
")";
+ }
+ return dump(dumpCommand);
+ }
+
Tuple dump(String dumpCommand) throws Throwable {
advanceDumpDir();
run(dumpCommand);
@@ -303,8 +315,6 @@ public class WarehouseInstance implements Closeable {
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") +
")";
}
- run("EXPLAIN " + replLoadCmd);
- printOutput();
return run(replLoadCmd);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 453929a..000d663 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -134,7 +134,7 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
for (String s : values) {
LOG.debug(" > " + s);
}
- Utils.writeOutput(values, new Path(work.resultTempPath), conf);
+ Utils.writeOutput(Collections.singletonList(values), new
Path(work.resultTempPath), conf);
}
/**
@@ -143,31 +143,69 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
* locations.
* 2. External or ACID tables are being bootstrapped for the first time : so
that we can dump
* those tables as a whole.
- * @return
+ * 3. If replication policy is changed/replaced, then need to examine all
the tables to see if
+ * any of them need to be bootstrapped as old policy doesn't include it but
new one does.
+ * @return true if need to examine tables for dump and false if not.
+ */
+ private boolean shouldExamineTablesToDump() {
+ return (work.oldReplScope != null)
+ || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
+ || conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES);
+ }
+
+ /**
+ * Decide whether to dump external tables data. If external tables are
enabled for replication,
+ * then need to dump it's data in all the incremental dumps.
+ * @return true if need to dump external table data and false if not.
*/
private boolean shouldDumpExternalTableLocation() {
return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
- && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
- ||
conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES));
+ && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
}
- private boolean shouldExamineTablesToDump() {
- return shouldDumpExternalTableLocation() ||
- conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+ /**
+ * Decide whether to dump external tables.
+ * @param tableName - Name of external table to be replicated
+ * @return true if need to bootstrap dump external table and false if not.
+ */
+ private boolean shouldBootstrapDumpExternalTable(String tableName) {
+ // Note: If repl policy is replaced, then need to dump external tables if
table is getting replicated
+ // for the first time in current dump. So, need to check if table is
included in old policy.
+ return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+ &&
(conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)
+ || !ReplUtils.tableIncludedInReplScope(work.oldReplScope,
tableName));
+ }
+
+ /**
+ * Decide whether to dump ACID tables.
+ * @param tableName - Name of ACID table to be replicated
+ * @return true if need to bootstrap dump ACID table and false if not.
+ */
+ private boolean shouldBootstrapDumpAcidTable(String tableName) {
+ // Note: If repl policy is replaced, then need to dump ACID tables if
table is getting replicated
+ // for the first time in current dump. So, need to check if table is
included in old policy.
+ return ReplUtils.includeAcidTableInDump(conf)
+ && (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
+ || !ReplUtils.tableIncludedInReplScope(work.oldReplScope,
tableName));
}
private boolean shouldBootstrapDumpTable(Table table) {
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES) &&
- TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ // Note: If control reaches here, it means, table is already included in
new replication policy.
+ if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+ && shouldBootstrapDumpExternalTable(table.getTableName())) {
return true;
}
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) &&
- AcidUtils.isTransactionalTable(table)) {
+ if (AcidUtils.isTransactionalTable(table)
+ && shouldBootstrapDumpAcidTable(table.getTableName())) {
return true;
}
- return false;
+ // If replication policy is changed with new included/excluded tables
list, then tables which
+ // are not included in old policy but included in new policy should be
bootstrapped along with
+ // the current incremental replication dump.
+ // Control reaches for Non-ACID tables.
+ return !ReplUtils.tableIncludedInReplScope(work.oldReplScope,
table.getTableName());
}
private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot,
Hive hiveDb) throws Exception {
@@ -181,7 +219,7 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
// bootstrap (See bootstrapDump() for more details. Only difference here
is instead of
// waiting for the concurrent transactions to finish, we start dumping the
incremental events
// and wait only for the remaining time if any.
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+ if (needBootstrapAcidTablesDuringIncrementalDump()) {
bootDumpBeginReplId =
queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
assert (bootDumpBeginReplId >= 0);
LOG.info("Dump for bootstrapping ACID tables during an incremental dump
for db {}",
@@ -233,24 +271,23 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
replLogger.endLog(lastReplId.toString());
LOG.info("Done dumping events, preparing to return {},{}",
dumpRoot.toUri(), lastReplId);
- Utils.writeOutput(
- Arrays.asList(
- "incremental",
- String.valueOf(work.eventFrom),
- String.valueOf(lastReplId)
- ),
- dmd.getDumpFilePath(), conf);
dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
- dmd.write();
- // If required wait more for any transactions open at the time of starting
the ACID bootstrap.
- if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
- assert (waitUntilTime > 0);
- validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+ // If repl policy is changed (oldReplScope is set), then pass the current
replication policy,
+ // so that REPL LOAD would drop the tables which are not included in
current policy.
+ if (work.oldReplScope != null) {
+ dmd.setReplScope(work.replScope);
}
+ dmd.write();
// Examine all the tables if required.
if (shouldExamineTablesToDump()) {
+ // If required wait more for any transactions open at the time of
starting the ACID bootstrap.
+ if (needBootstrapAcidTablesDuringIncrementalDump()) {
+ assert (waitUntilTime > 0);
+ validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+ }
+
Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true);
try (Writer writer = new Writer(dumpRoot, conf)) {
@@ -259,8 +296,8 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
Table table = hiveDb.getTable(dbName, tableName);
// Dump external table locations if required.
- if (shouldDumpExternalTableLocation() &&
- TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+ && shouldDumpExternalTableLocation()) {
writer.dataLocationDump(table);
}
@@ -282,6 +319,13 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
return lastReplId;
}
+ private boolean needBootstrapAcidTablesDuringIncrementalDump() {
+ // If old replication policy is available, then it is possible some of the
ACID tables might be
+ // included for bootstrap during incremental dump.
+ return (ReplUtils.includeAcidTableInDump(conf)
+ && ((work.oldReplScope != null) ||
conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)));
+ }
+
private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean
isIncrementalPhase) {
if (isIncrementalPhase) {
dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
@@ -296,7 +340,8 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
db,
conf,
getNewEventOnlyReplicationSpec(ev.getEventId()),
- work.replScope
+ work.replScope,
+ work.oldReplScope
);
EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
eventHandler.handle(context);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 247066c..7bae9ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
Explain.Level.EXTENDED })
public class ReplDumpWork implements Serializable {
final ReplScope replScope;
+ final ReplScope oldReplScope;
final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath;
final Long eventFrom;
Long eventTo;
@@ -40,10 +41,11 @@ public class ReplDumpWork implements Serializable {
testInjectDumpDir = dumpDir;
}
- public ReplDumpWork(ReplScope replScope,
+ public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope,
Long eventFrom, Long eventTo, String
astRepresentationForErrorMsg, Integer maxEventLimit,
String resultTempPath) {
this.replScope = replScope;
+ this.oldReplScope = oldReplScope;
this.dbNameOrPattern = replScope.getDbName();
this.eventFrom = eventFrom;
this.eventTo = eventTo;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 7d2c7c9..e95fe1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hive.ql.exec.repl;
+import com.google.common.collect.Collections2;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
@@ -364,6 +367,35 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
}
}
+ /**
+ * If replication policy is changed between previous and current load, then
the excluded tables in
+ * the new replication policy will be dropped.
+ * @throws HiveException Failed to get/drop the tables.
+ */
+ private void dropTablesExcludedInReplScope(ReplScope replScope) throws
HiveException {
+ // If all tables are included in replication scope, then nothing to be
dropped.
+ if ((replScope == null) || replScope.includeAllTables()) {
+ return;
+ }
+
+ Hive db = getHive();
+ String dbName = replScope.getDbName();
+
+ // List all the tables that are excluded in the current repl scope.
+ Iterable<String> tableNames = Collections2.filter(db.getAllTables(dbName),
+ tableName -> {
+ assert(tableName != null);
+ return !tableName.toLowerCase().startsWith(
+ SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
+ && !replScope.tableIncludedInReplScope(tableName);
+ });
+ for (String table : tableNames) {
+ db.dropTable(dbName + "." + table, true);
+ }
+ LOG.info("Tables in the Database: {} that are excluded in the replication
scope are dropped.",
+ dbName);
+ }
+
private void createEndReplLogTask(Context context, Scope scope,
ReplLogger replLogger) throws
SemanticException {
Map<String, String> dbProps;
@@ -457,6 +489,10 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
work.needCleanTablesFromBootstrap = false;
}
+ // If replication policy is changed between previous and current repl
load, then drop the tables
+ // that are excluded in the new replication policy.
+ dropTablesExcludedInReplScope(work.currentReplScope);
+
IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
// If incremental events are already applied, then check and perform if
need to bootstrap any tables.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index f1f764e..1d63cd8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
import
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
@@ -42,6 +43,7 @@ import static
org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.D
Explain.Level.EXTENDED })
public class ReplLoadWork implements Serializable {
final String dbNameToLoadIn;
+ final ReplScope currentReplScope;
final String dumpDirectory;
final String bootstrapDumpToCleanTables;
boolean needCleanTablesFromBootstrap;
@@ -61,12 +63,19 @@ public class ReplLoadWork implements Serializable {
*/
final LineageState sessionStateLineageState;
- public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String
dbNameToLoadIn,
- LineageState lineageState, boolean isIncrementalDump, Long eventTo,
- List<DirCopyWork> pathsToCopyIterator) throws IOException {
+ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory,
+ String dbNameToLoadIn, ReplScope currentReplScope,
+ LineageState lineageState, boolean isIncrementalDump,
Long eventTo,
+ List<DirCopyWork> pathsToCopyIterator) throws
IOException {
sessionStateLineageState = lineageState;
this.dumpDirectory = dumpDirectory;
this.dbNameToLoadIn = dbNameToLoadIn;
+ this.currentReplScope = currentReplScope;
+
+ // If DB name is changed during REPL LOAD, then set it instead of
referring to source DB name.
+ if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) {
+ currentReplScope.setDbName(dbNameToLoadIn);
+ }
this.bootstrapDumpToCleanTables =
hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG);
this.needCleanTablesFromBootstrap =
StringUtils.isNotBlank(this.bootstrapDumpToCleanTables);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 27127e0..2db6073 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl.util;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.repl.ReplConst;
+import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -272,4 +273,8 @@ public class ReplUtils {
return true;
}
+
+ public static boolean tableIncludedInReplScope(ReplScope replScope, String
tableName) {
+ return ((replScope == null) ||
replScope.tableIncludedInReplScope(tableName));
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index cfdf180..e023005 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -890,20 +890,28 @@ importStatement
;
replDumpStatement
-@init { pushMsg("replication dump statement", state); }
+@init { pushMsg("Replication dump statement", state); }
@after { popMsg(state); }
: KW_REPL KW_DUMP
- (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)?
+ (dbPolicy=replDbPolicy)
+ (KW_REPLACE oldDbPolicy=replDbPolicy)?
(KW_FROM (eventId=Number)
(KW_TO (rangeEnd=Number))?
(KW_LIMIT (batchSize=Number))?
)?
(KW_WITH replConf=replConfigs)?
- -> ^(TOK_REPL_DUMP $dbName $tablePolicy? ^(TOK_FROM $eventId (TOK_TO
$rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
+ -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? ^(TOK_FROM
$eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
+ ;
+
+replDbPolicy
+@init { pushMsg("Repl dump DB replication policy", state); }
+@after { popMsg(state); }
+ :
+ (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? -> $dbName
$tablePolicy?
;
replLoadStatement
-@init { pushMsg("replication load statement", state); }
+@init { pushMsg("Replication load statement", state); }
@after { popMsg(state); }
: KW_REPL KW_LOAD
(dbName=identifier)?
@@ -913,21 +921,21 @@ replLoadStatement
;
replConfigs
-@init { pushMsg("repl configurations", state); }
+@init { pushMsg("Repl configurations", state); }
@after { popMsg(state); }
:
LPAREN replConfigsList RPAREN -> ^(TOK_REPL_CONFIG replConfigsList)
;
replConfigsList
-@init { pushMsg("repl configurations list", state); }
+@init { pushMsg("Repl configurations list", state); }
@after { popMsg(state); }
:
keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST
keyValueProperty+)
;
replTableLevelPolicy
-@init { pushMsg("replication table level policy definition", state); }
+@init { pushMsg("Replication table level policy definition", state); }
@after { popMsg(state); }
:
((replTablesIncludeList=replTablesList) (DOT
replTablesExcludeList=replTablesList)?)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 4f4a02f..df41a2e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -59,6 +59,7 @@ import static
org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_NULL;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD;
@@ -70,6 +71,7 @@ import static
org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO;
public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// Replication Scope
private ReplScope replScope = new ReplScope();
+ private ReplScope oldReplScope = null;
private Long eventFrom;
private Long eventTo;
@@ -130,12 +132,13 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
}
}
- private void setReplDumpTablesList(Tree replTablesNode) throws HiveException
{
+ private void setReplDumpTablesList(Tree replTablesNode, ReplScope replScope)
throws HiveException {
int childCount = replTablesNode.getChildCount();
assert(childCount <= 2);
// Traverse the children which can be either just include tables list or
both include
// and exclude tables lists.
+ String replScopeType = (replScope == this.replScope) ? "Current" : "Old";
for (int listIdx = 0; listIdx < childCount; listIdx++) {
Tree tablesListNode = replTablesNode.getChild(listIdx);
assert(tablesListNode.getType() == TOK_REPL_TABLES_LIST);
@@ -151,21 +154,49 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
}
if (listIdx == 0) {
- LOG.info("ReplScope: Set Included Tables List: {}", tablesList);
+ LOG.info("{} ReplScope: Set Included Tables List: {}", replScopeType,
tablesList);
replScope.setIncludedTablePatterns(tablesList);
} else {
- LOG.info("ReplScope: Set Excluded Tables List: {}", tablesList);
+ LOG.info("{} ReplScope: Set Excluded Tables List: {}", replScopeType,
tablesList);
replScope.setExcludedTablePatterns(tablesList);
}
}
}
+ private void setOldReplPolicy(Tree oldReplPolicyTree) throws HiveException {
+ oldReplScope = new ReplScope();
+ int childCount = oldReplPolicyTree.getChildCount();
+
+ // First child is DB name and optional second child is tables list.
+ assert(childCount <= 2);
+
+ // First child is always the DB name. So set it.
+ oldReplScope.setDbName(oldReplPolicyTree.getChild(0).getText());
+ LOG.info("Old ReplScope: Set DB Name: {}", oldReplScope.getDbName());
+ if (!oldReplScope.getDbName().equalsIgnoreCase(replScope.getDbName())) {
+ LOG.error("DB name {} cannot be replaced to {} in the replication
policy.",
+ oldReplScope.getDbName(), replScope.getDbName());
+ throw new SemanticException("DB name cannot be replaced in the
replication policy.");
+ }
+
+ // If the old policy is just <db_name>, then tables list won't be there.
+ if (childCount <= 1) {
+ return;
+ }
+
+ // Traverse the children which can be either just include tables list or
both include
+ // and exclude tables lists.
+ Tree oldPolicyTablesListNode = oldReplPolicyTree.getChild(1);
+ assert(oldPolicyTablesListNode.getType() == TOK_REPL_TABLES);
+ setReplDumpTablesList(oldPolicyTablesListNode, oldReplScope);
+ }
+
private void initReplDump(ASTNode ast) throws HiveException {
int numChildren = ast.getChildCount();
boolean isMetaDataOnly = false;
String dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
- LOG.info("ReplScope: Set DB Name: {}", dbNameOrPattern);
+ LOG.info("Current ReplScope: Set DB Name: {}", dbNameOrPattern);
replScope.setDbName(dbNameOrPattern);
// Skip the first node, which is always required
@@ -185,7 +216,11 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
break;
}
case TOK_REPL_TABLES: {
- setReplDumpTablesList(currNode);
+ setReplDumpTablesList(currNode, replScope);
+ break;
+ }
+ case TOK_REPLACE: {
+ setOldReplPolicy(currNode);
break;
}
case TOK_FROM: {
@@ -247,6 +282,7 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
Task<ReplDumpWork> replDumpWorkTask = TaskFactory
.get(new ReplDumpWork(
replScope,
+ oldReplScope,
eventFrom,
eventTo,
ErrorMsg.INVALID_PATH.getMsg(ast),
@@ -413,6 +449,7 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
LOG.debug("{} contains an bootstrap dump", loadPath);
}
ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(),
replScope.getDbName(),
+ dmd.getReplScope(),
queryState.getLineageState(), evDump, dmd.getEventTo(),
dirLocationsToCopy(loadPath, evDump));
rootTasks.add(TaskFactory.get(replLoadWork, conf));
@@ -511,6 +548,6 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
LOG.debug(" > " + s);
}
ctx.setResFile(ctx.getLocalTmpPath());
- Utils.writeOutput(values, ctx.getResFile(), conf);
+ Utils.writeOutput(Collections.singletonList(values), ctx.getResFile(),
conf);
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index c2e26f0..4500fb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -169,7 +169,7 @@ public class TableExport {
}
private boolean shouldExport() {
- return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle,
false, conf);
+ return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle,
false, null, conf);
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index ce5ef06..11a4d62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -59,18 +59,20 @@ public class Utils {
IDLE, ACTIVE
}
- public static void writeOutput(List<String> values, Path outputFile,
HiveConf hiveConf)
+ public static void writeOutput(List<List<String>> listValues, Path
outputFile, HiveConf hiveConf)
throws SemanticException {
DataOutputStream outStream = null;
try {
FileSystem fs = outputFile.getFileSystem(hiveConf);
outStream = fs.create(outputFile);
- outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput
: values.get(0)));
- for (int i = 1; i < values.size(); i++) {
- outStream.write(Utilities.tabCode);
- outStream.writeBytes((values.get(i) == null ?
Utilities.nullStringOutput : values.get(i)));
+ for (List<String> values : listValues) {
+ outStream.writeBytes((values.get(0) == null ?
Utilities.nullStringOutput : values.get(0)));
+ for (int i = 1; i < values.size(); i++) {
+ outStream.write(Utilities.tabCode);
+ outStream.writeBytes((values.get(i) == null ?
Utilities.nullStringOutput : values.get(i)));
+ }
+ outStream.write(Utilities.newLineCode);
}
- outStream.write(Utilities.newLineCode);
} catch (IOException e) {
throw new SemanticException(e);
} finally {
@@ -175,7 +177,7 @@ public class Utils {
* specific checks.
*/
public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table
tableHandle,
- boolean isEventDump, HiveConf
hiveConf) {
+ boolean isEventDump, ReplScope
oldReplScope, HiveConf hiveConf) {
if (replicationSpec == null) {
replicationSpec = new ReplicationSpec();
}
@@ -199,8 +201,10 @@ public class Utils {
|| replicationSpec.isMetadataOnly();
if (isEventDump) {
// Skip dumping of events related to external tables if bootstrap is
enabled on it.
+ // Also, skip if current table is included only in new policy but
not in old policy.
shouldReplicateExternalTables = shouldReplicateExternalTables
- &&
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES);
+ &&
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)
+ && ReplUtils.tableIncludedInReplScope(oldReplScope,
tableHandle.getTableName());
}
return shouldReplicateExternalTables;
}
@@ -210,17 +214,30 @@ public class Utils {
return false;
}
- // Skip dumping events related to ACID tables if bootstrap is enabled
on it
+ // Skip dumping events related to ACID tables if bootstrap is enabled
on it.
+ // Also, skip if current table is included only in new policy but not
in old policy.
if (isEventDump) {
- return
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+ return
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
+ && ReplUtils.tableIncludedInReplScope(oldReplScope,
tableHandle.getTableName());
}
}
+
+ // If replication policy is replaced with new included/excluded tables
list, then events
+ // corresponding to tables which are not included in old policy but
included in new policy
+ // should be skipped. Those tables would be bootstrapped along with the
current incremental
+ // replication dump.
+ // Note: If any event dump reaches here, it means, table is included in
new replication policy.
+ if (isEventDump && !ReplUtils.tableIncludedInReplScope(oldReplScope,
tableHandle.getTableName())) {
+ return false;
+ }
}
return true;
}
public static boolean shouldReplicate(NotificationEvent tableForEvent,
- ReplicationSpec replicationSpec, Hive db, boolean isEventDump, HiveConf
hiveConf) {
+ ReplicationSpec replicationSpec, Hive
db,
+ boolean isEventDump, ReplScope
oldReplScope,
+ HiveConf hiveConf) {
Table table;
try {
table = db.getTable(tableForEvent.getDbName(),
tableForEvent.getTableName());
@@ -230,7 +247,7 @@ public class Utils {
.getTableName(), e);
return false;
}
- return shouldReplicate(replicationSpec, table, isEventDump, hiveConf);
+ return shouldReplicate(replicationSpec, table, isEventDump, oldReplScope,
hiveConf);
}
static List<Path> getDataPathList(Path fromPath, ReplicationSpec
replicationSpec, HiveConf conf)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
index d938cc1..be6574d 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
@@ -32,6 +32,7 @@ abstract class AbstractConstraintEventHandler<T extends
EventMessage> extends Ab
withinContext.replicationSpec,
withinContext.db,
true,
+ withinContext.oldReplScope,
withinContext.hiveConf
);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index 0756f59..b6d2d61 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -67,7 +67,8 @@ class AddPartitionHandler extends AbstractEventHandler {
}
final Table qlMdTable = new Table(tobj);
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true,
withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+ true, withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
index bd25a6c..a940102 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -49,6 +49,15 @@ class AllocWriteIdHandler extends
AbstractEventHandler<AllocWriteIdMessage> {
return;
}
+ // If replication policy is replaced with new included/excluded tables
list, then events
+ // corresponding to tables which are not included in old policy but
included in new policy
+ // should be skipped. Those tables would be bootstrapped along with the
current incremental
+ // replication dump.
+ // Note: If any event dump reaches here, it means, it is included in new
replication policy.
+ if (!ReplUtils.tableIncludedInReplScope(withinContext.oldReplScope,
eventMessage.getTableName())) {
+ return;
+ }
+
DumpMetaData dmd = withinContext.createDmd(this);
dmd.setPayload(eventMessageAsJSON);
dmd.write();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
index e59bdf6..7d6599b 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
@@ -100,7 +100,8 @@ class AlterPartitionHandler extends
AbstractEventHandler<AlterPartitionMessage>
}
Table qlMdTable = new Table(tableObject);
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true,
withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+ true, withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
index f02cbb8..89bd0bf 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -88,7 +88,8 @@ class AlterTableHandler extends
AbstractEventHandler<AlterTableMessage> {
Table qlMdTableBefore = new Table(before);
if (!Utils
- .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, true,
withinContext.hiveConf)) {
+ .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore,
+ true, withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 995d634..a13fe38 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -113,7 +113,13 @@ class CommitTxnHandler extends
AbstractEventHandler<CommitTxnMessage> {
: new ArrayList<>(Collections2.filter(writeEventInfoList,
writeEventInfo -> {
assert(writeEventInfo != null);
- return
withinContext.replScope.tableIncludedInReplScope(writeEventInfo.getTable());
+ // If replication policy is replaced with new
included/excluded tables list, then events
+ // corresponding to tables which are included in both old and
new policies should be dumped.
+ // If table is included in new policy but not in old policy,
then it should be skipped.
+ // Those tables would be bootstrapped along with the current
incremental
+ // replication dump.
+ return
(ReplUtils.tableIncludedInReplScope(withinContext.replScope,
writeEventInfo.getTable())
+ &&
ReplUtils.tableIncludedInReplScope(withinContext.oldReplScope,
writeEventInfo.getTable()));
})));
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 8a838db..be7f7eb 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -54,7 +54,8 @@ class CreateTableHandler extends
AbstractEventHandler<CreateTableMessage> {
Table qlMdTable = new Table(tobj);
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true,
withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+ true, withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
index 0d60c31..b75d12b 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -41,15 +41,17 @@ public interface EventHandler {
final HiveConf hiveConf;
final ReplicationSpec replicationSpec;
final ReplScope replScope;
+ final ReplScope oldReplScope;
public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
- ReplicationSpec replicationSpec, ReplScope replScope) {
+ ReplicationSpec replicationSpec, ReplScope replScope, ReplScope
oldReplScope) {
this.eventRoot = eventRoot;
this.cmRoot = cmRoot;
this.db = db;
this.hiveConf = hiveConf;
this.replicationSpec = replicationSpec;
this.replScope = replScope;
+ this.oldReplScope = oldReplScope;
}
public Context(Context other) {
@@ -59,6 +61,7 @@ public interface EventHandler {
this.hiveConf = other.hiveConf;
this.replicationSpec = other.replicationSpec;
this.replScope = other.replScope;
+ this.oldReplScope = other.oldReplScope;
}
void setEventRoot(Path eventRoot) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 1bcd529..116db5b 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -58,7 +58,8 @@ class InsertHandler extends
AbstractEventHandler<InsertMessage> {
withinContext.replicationSpec.setNoop(true);
}
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true,
withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+ true, withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
index 54fc7a6..23d088d 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
@@ -52,8 +52,8 @@ class UpdatePartColStatHandler extends
AbstractEventHandler<UpdatePartitionColum
return;
}
- if (!Utils.shouldReplicate(withinContext.replicationSpec, new
Table(tableObj), true,
- withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, new
Table(tableObj),
+ true, withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
index 62db959..ab314ed 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
@@ -37,7 +37,8 @@ class UpdateTableColStatHandler extends
AbstractEventHandler<UpdateTableColumnSt
public void handle(Context withinContext) throws Exception {
LOG.info("Processing#{} UpdateTableColumnStat message : {}",
fromEventId(), eventMessageAsJSON);
Table qlMdTable = new Table(eventMessage.getTableObject());
- if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true,
withinContext.hiveConf)) {
+ if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+ true, withinContext.oldReplScope, withinContext.hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 552183a..bef4780 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -52,7 +52,7 @@ public class TableSerializer implements JsonWriter.Serializer
{
@Override
public void writeTo(JsonWriter writer, ReplicationSpec
additionalPropertiesProvider)
throws SemanticException, IOException {
- if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle,
false, hiveConf)) {
+ if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle,
false, null, hiveConf)) {
return;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
index 974e105..4c137eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
@@ -19,32 +19,38 @@ package org.apache.hadoop.hive.ql.parse.repl.load;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
public class DumpMetaData {
// wrapper class for reading and writing metadata about a dump
// responsible for _dumpmetadata files
private static final String DUMP_METADATA = "_dumpmetadata";
+ private static final Logger LOG =
LoggerFactory.getLogger(DumpMetaData.class);
private DumpType dumpType;
private Long eventFrom = null;
private Long eventTo = null;
+ private Path cmRoot;
private String payload = null;
- private boolean initialized = false;
+ private ReplScope replScope = null;
+ private boolean initialized = false;
private final Path dumpFile;
private final HiveConf hiveConf;
- private Path cmRoot;
public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
this.hiveConf = hiveConf;
@@ -61,8 +67,66 @@ public class DumpMetaData {
this.dumpType = lvl;
this.eventFrom = eventFrom;
this.eventTo = eventTo;
- this.initialized = true;
this.cmRoot = cmRoot;
+ this.initialized = true;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+ public void setReplScope(ReplScope replScope) {
+ this.replScope = replScope;
+ }
+
+ private void readReplScope(String line) throws IOException {
+ if (line == null) {
+ return;
+ }
+
+ String[] lineContents = line.split("\t");
+ if (lineContents.length < 1) {
+ return;
+ }
+
+ replScope = new ReplScope();
+
+ LOG.info("Read ReplScope: Set Db Name: {}.", lineContents[0]);
+ replScope.setDbName(lineContents[0]);
+
+ // Read/set include and exclude tables list.
+ int idx = readReplScopeTablesList(lineContents, 1, true);
+ readReplScopeTablesList(lineContents, idx, false);
+ }
+
+ private int readReplScopeTablesList(String[] lineContents, int startIdx,
boolean includeList)
+ throws IOException {
+ // If the list doesn't exist, then return.
+ if (startIdx >= lineContents.length) {
+ return startIdx;
+ }
+
+ // Each tables list should start with "{" and ends with "}"
+ if (!"{".equals(lineContents[startIdx])) {
+ throw new IOException("Invalid repl tables list data in dump metadata
file. Missing \"{\".");
+ }
+
+ List<String>tableNames = new ArrayList<>();
+ for (int i = (startIdx + 1); i < lineContents.length; i++) {
+ String value = lineContents[i];
+ if ("}".equals(value)) {
+ if (includeList) {
+ LOG.info("Read ReplScope: Set Include Table Names: {}.", tableNames);
+ replScope.setIncludedTablePatterns(tableNames);
+ } else {
+ LOG.info("Read ReplScope: Set Exclude Table Names: {}.", tableNames);
+ replScope.setExcludedTablePatterns(tableNames);
+ }
+ return (i + 1);
+ }
+ tableNames.add(value);
+ }
+ throw new IOException("Invalid repl tables list data in dump metadata
file. Missing \"}\".");
}
private void loadDumpFromFile() throws SemanticException {
@@ -71,7 +135,7 @@ public class DumpMetaData {
// read from dumpfile and instantiate self
FileSystem fs = dumpFile.getFileSystem(hiveConf);
br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
- String line = null;
+ String line;
if ((line = br.readLine()) != null) {
String[] lineContents = line.split("\t", 5);
setDump(DumpType.valueOf(lineContents[0]),
Long.valueOf(lineContents[1]),
@@ -82,6 +146,7 @@ public class DumpMetaData {
throw new IOException(
"Unable to read valid values from dumpFile:" +
dumpFile.toUri().toString());
}
+ readReplScope(br.readLine());
} catch (IOException ioe) {
throw new SemanticException(ioe);
} finally {
@@ -105,10 +170,6 @@ public class DumpMetaData {
return this.payload;
}
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
public Long getEventFrom() throws SemanticException {
initializeIfNot();
return eventFrom;
@@ -119,6 +180,10 @@ public class DumpMetaData {
return eventTo;
}
+ public ReplScope getReplScope() throws SemanticException {
+ initializeIfNot();
+ return replScope;
+ }
public Path getDumpFilePath() {
return dumpFile;
}
@@ -134,17 +199,42 @@ public class DumpMetaData {
}
}
+ private List<String> prepareReplScopeValues() {
+ assert(replScope != null);
+
+ List<String> values = new ArrayList<>();
+ values.add(replScope.getDbName());
+
+ List<String> includedTableNames = replScope.getIncludedTableNames();
+ List<String> excludedTableNames = replScope.getExcludedTableNames();
+ if (includedTableNames != null) {
+ values.add("{");
+ values.addAll(includedTableNames);
+ values.add("}");
+ }
+ if (excludedTableNames != null) {
+ values.add("{");
+ values.addAll(excludedTableNames);
+ values.add("}");
+ }
+ LOG.info("Preparing ReplScope {} to dump.", values);
+ return values;
+ }
public void write() throws SemanticException {
- Utils.writeOutput(
+ List<List<String>> listValues = new ArrayList<>();
+ listValues.add(
Arrays.asList(
dumpType.toString(),
eventFrom.toString(),
eventTo.toString(),
cmRoot.toString(),
- payload),
- dumpFile,
- hiveConf
+ payload)
+ );
+ if (replScope != null) {
+ listValues.add(prepareReplScopeValues());
+ }
+ Utils.writeOutput(listValues, dumpFile, hiveConf
);
}
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index a412d43..aacd295 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -139,7 +139,7 @@ public class TestReplDumpTask {
task.initialize(queryState, null, null, null);
task.setWork(
- new ReplDumpWork(replScope,
+ new ReplDumpWork(replScope, null,
Long.MAX_VALUE, Long.MAX_VALUE, "",
Integer.MAX_VALUE, "")
);
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
index b8c95d5..14b58a3 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
@@ -29,8 +29,12 @@ import java.util.regex.Pattern;
public class ReplScope implements Serializable {
private String dbName;
private Pattern dbNamePattern;
- private List<Pattern> includedTableNamePatterns; // Only for REPL DUMP and
exist only if tableName == null.
- private List<Pattern> excludedTableNamePatterns; // Only for REPL DUMP and
exist only if tableName == null.
+
+ // Include and exclude table names/patterns exist only for REPL DUMP.
+ private List<String> includedTableNames;
+ private List<String> excludedTableNames;
+ private List<Pattern> includedTableNamePatterns;
+ private List<Pattern> excludedTableNamePatterns;
public ReplScope() {
}
@@ -49,12 +53,22 @@ public class ReplScope implements Serializable {
return dbName;
}
- public void setIncludedTablePatterns(List<String> includedTableNamePatterns)
{
- this.includedTableNamePatterns =
compilePatterns(includedTableNamePatterns);
+ public void setIncludedTablePatterns(List<String> includedTableNames) {
+ this.includedTableNames = includedTableNames;
+ this.includedTableNamePatterns = compilePatterns(includedTableNames);
+ }
+
+ public List<String> getIncludedTableNames() {
+ return includedTableNames;
+ }
+
+ public void setExcludedTablePatterns(List<String> excludedTableNames) {
+ this.excludedTableNames = excludedTableNames;
+ this.excludedTableNamePatterns = compilePatterns(excludedTableNames);
}
- public void setExcludedTablePatterns(List<String> excludedTableNamePatterns)
{
- this.excludedTableNamePatterns =
compilePatterns(excludedTableNamePatterns);
+ public List<String> getExcludedTableNames() {
+ return excludedTableNames;
}
public boolean includeAllTables() {