This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 870713ce031 HIVE-26601: Registering table metric during second load
cycle of optimized bootstrap (#3992) (Vinit Patni, reviewed by Teddy Choi)
870713ce031 is described below
commit 870713ce031b346cdd9008a3217d8cc806ea9f7a
Author: vinitpatni <[email protected]>
AuthorDate: Fri Feb 3 13:13:30 2023 +0530
HIVE-26601: Registering table metric during second load cycle of optimized
bootstrap (#3992) (Vinit Patni, reviewed by Teddy Choi)
---
.../parse/TestReplicationOptimisedBootstrap.java | 81 ++++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 6 +-
.../incremental/IncrementalLoadTasksBuilder.java | 7 +-
3 files changed, 90 insertions(+), 4 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 4959bacf5ad..a55b7c8a5b4 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -992,6 +992,87 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosA
assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
}
+ @Test
+ public void testTblMetricRegisterDuringSecondLoadCycleOfOptimizedBootstrap()
throws Throwable {
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(false);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create table t1_managed (id int) clustered by(id) into 3
buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into table t1_managed values (10)")
+ .run("insert into table t1_managed values (20),(31),(42)")
+ .dump(primaryDbName, withClause);
+
+ // Do the bootstrap load and check all the external & managed tables are
present.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1_managed"})
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do an incremental dump & load, Add one table which we can drop & an
empty table as well.
+ tuple = primary.run("use " + primaryDbName)
+ .run("create table t2_managed (id int) clustered by(id) into 3
buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into table t2_managed values (10)")
+ .run("insert into table t2_managed values (20),(31),(42)")
+ .dump(primaryDbName, withClause);
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1_managed", "t2_managed"})
+ .verifyReplTargetProperty(replicatedDbName);
+
+ primary.run("use " + primaryDbName)
+ .run("insert into table t1_managed values (30)")
+ .run("insert into table t1_managed values (50),(51),(52)");
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(false);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+
+ // Do a reverse dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check the event ack file got created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+
+ // Do a load, this should create a table_diff_complete directory
+ primary.load(primaryDbName,replicatedDbName, withClause);
+
+ // Check the table diff directory exist.
+ assertTrue(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ Path dumpPath = new Path(tuple.dumpLocation);
+ // Check the table diff has all the modified table, including the dropped
and empty ones
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath,
conf);
+ assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+ .containsAll(Arrays.asList("t1_managed")));
+
+ isMetricsEnabledForTests(true);
+ replica.dump(replicatedDbName, withClause);
+
+ //do a load on primary and verify insert queries are discarded
+ primary.load(primaryDbName,replicatedDbName, withClause)
+ .run("select id from t1_managed")
+ .verifyResults(new String[] { "10", "20", "31", "42" });
+ MetricCollector collector = MetricCollector.getInstance();
+ ReplicationMetric metric = collector.getMetrics().getLast();
+ Stage stage = metric.getProgress().getStageByName("REPL_LOAD");
+ Metric tableMetric =
stage.getMetricByName(ReplUtils.MetricName.TABLES.name());
+ assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
+ }
+
@NotNull
private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 2c379472d3a..60f24862112 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -150,9 +150,6 @@ public class ReplLoadWork implements Serializable,
ReplLoadWorkMBean {
isFirstFailover = checkFileExists(dumpDirParent, hiveConf,
EVENT_ACK_FILE);
isSecondFailover =
!isFirstFailover && checkFileExists(dumpDirParent, hiveConf,
BOOTSTRAP_TABLES_LIST);
- incrementalLoadTasksBuilder = new
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
- new IncrementalLoadEventsIterator(dumpDirectory, hiveConf),
hiveConf, eventTo, metricCollector,
- replStatsTracker, shouldFailover);
/*
* If the current incremental dump also includes bootstrap for some
tables, then create iterator
@@ -186,6 +183,9 @@ public class ReplLoadWork implements Serializable,
ReplLoadWorkMBean {
this.bootstrapIterator = null;
this.constraintsIterator = null;
}
+ incrementalLoadTasksBuilder = new
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
+ new IncrementalLoadEventsIterator(dumpDirectory, hiveConf),
hiveConf, eventTo, metricCollector,
+ replStatsTracker, shouldFailover, tablesToBootstrap.size());
} else {
this.bootstrapIterator = new BootstrapEventsIterator(new
Path(dumpDirectory, EximUtil.METADATA_PATH_NAME)
.toString(), dbNameToLoadIn, true, hiveConf, metricCollector);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index c5f6d6ed2f2..31a53054028 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -82,7 +82,7 @@ public class IncrementalLoadTasksBuilder {
public IncrementalLoadTasksBuilder(String dbName, String loadPath,
IncrementalLoadEventsIterator iterator,
HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector,
ReplStatsTracker replStatsTracker,
- boolean shouldFailover) throws
SemanticException {
+ boolean shouldFailover, int
bootstrapTableSize) throws SemanticException {
this.dbName = dbName;
dumpDirectory = (new Path(loadPath).getParent()).toString();
this.iterator = iterator;
@@ -102,6 +102,11 @@ public class IncrementalLoadTasksBuilder {
this.metricCollector.reportFailoverStart("REPL_LOAD", metricMap,
new FailoverMetaData(new Path(dumpDirectory,
ReplUtils.REPL_HIVE_BASE_DIR), conf));
} else {
+ //Registering table metric as we do boostrap of selective tables
+ // in second load cycle of optimized bootstrap
+ if(bootstrapTableSize > 0) {
+ metricMap.put(ReplUtils.MetricName.TABLES.name(), (long)
bootstrapTableSize);
+ }
this.metricCollector.reportStageStart("REPL_LOAD", metricMap);
}
}