This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1d0fcea3adb HIVE-26962 : Expose resume/reset ready state through
replication metrics when first cycle of resume/reset completes (#4016)
(Shreenidhi Saigaonkar, reviewed by Teddy Choi)
1d0fcea3adb is described below
commit 1d0fcea3adbec9ea9ee3f999b080a6f98ed89264
Author: shreenidhiSaigaonkar
<[email protected]>
AuthorDate: Fri Feb 24 06:16:42 2023 +0530
HIVE-26962 : Expose resume/reset ready state through replication metrics
when first cycle of resume/reset completes (#4016) (Shreenidhi Saigaonkar,
reviewed by Teddy Choi)
---
.../parse/TestReplicationScenariosAcidTables.java | 88 +++++++++++++++++++++-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 12 ++-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 6 +-
.../hive/ql/parse/repl/metric/event/Status.java | 1 +
4 files changed, 104 insertions(+), 3 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 0dfb07f2282..5c69443e73f 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -59,6 +59,7 @@ import
org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
@@ -71,7 +72,7 @@ import org.junit.Test;
import org.junit.BeforeClass;
import javax.annotation.Nullable;
-
+import java.util.concurrent.TimeUnit;
import java.io.File;
import java.io.IOException;
import java.io.BufferedReader;
@@ -103,6 +104,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables.
*/
@@ -3797,4 +3800,87 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
// ensure event count is captured appropriately in EventsDumpMetadata.
assertEquals(eventsCountInAckFile, eventCountFromStagingDir);
}
+
+ @Test
+ public void testResumeWorkFlow() throws Throwable {
+ isMetricsEnabledForTests(true);
+
+ MetricCollector.getInstance().getMetrics().clear();
+
+ // Do bootstrap
+ primary.run("use " + primaryDbName)
+ .run("create table tb1(id int)")
+ .run("insert into tb1 values(10)")
+ .dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName);
+
+ // incremental
+ primary.run("use " + primaryDbName)
+ .run("insert into tb1 values(20)")
+ .dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName);
+
+ // suppose this is the point of failover
+ List<String> failoverConfigs = Arrays.asList("'" +
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+ primary.dump(primaryDbName, failoverConfigs);
+ replica.load(replicatedDbName, primaryDbName, failoverConfigs);
+
+ // let's modify replica/target after failover
+ replica.run("use " + replicatedDbName)
+ .run("insert into tb1 values(30),(40)")
+ .run("create table tb2(id int)")
+ .run("insert into tb2 values(10),(20)");
+
+ // orchestrator will do the swapping and setting correct db params
+ Map<String, String> dbParams =
replica.getDatabase(replicatedDbName).getParameters();
+ String lastId = dbParams.get("repl.last.id");
+ String targetLastId = dbParams.get("repl.target.last.id");
+
+ primary.run("alter database " + primaryDbName
+ + " set dbproperties('repl.resume.started'='true', 'repl.source.for'='',
'repl.target" +
+ ".for'='true', 'repl.last.id'='" + targetLastId + "'
,'repl.target.last.id'='" + lastId +
+ "')");
+
+ replica.run("alter database " + replicatedDbName
+ + " set dbproperties('repl.target.for'='',
'repl.source.for'='p1','repl.resume" +
+ ".started'='true')");
+
+ // initiate RESET (1st cycle of optimised bootstrap)
+ primary.dump(primaryDbName);
+
+ MetricCollector collector = MetricCollector.getInstance();
+ ReplicationMetric metric = collector.getMetrics().getLast();
+ assertEquals(Status.RESUME_READY, metric.getProgress().getStatus());
+
+ replica.load(replicatedDbName, primaryDbName);
+ metric = collector.getMetrics().getLast();
+ assertEquals(Status.RESUME_READY, metric.getProgress().getStatus());
+
+ // this will be completion cycle for RESET
+ primary.dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName);
+
+ // AFTER RESET : 1. New table got dropped
+ // 2. Changes made on existing table got discarded.
+ replica.run("use "+ replicatedDbName)
+ .run("select id from tb1")
+ .verifyResults(new String[]{"10", "20"})
+ .run("show tables in " + replicatedDbName)
+ .verifyResults(new String[]{"tb1"});
+
+ // verify that the db params got reset after RESUME
+ Map<String, String> srcParams =
primary.getDatabase(primaryDbName).getParameters();
+ assertNotNull(srcParams.get("repl.source.for"));
+ assertNull(srcParams.get("repl.target.for"));
+ assertNull(srcParams.get("repl.resume.started"));
+ assertNull(srcParams.get("repl.target.last.id"));
+ assertNull(srcParams.get("repl.last.id"));
+
+ Map<String, String> targetParams =
replica.getDatabase(replicatedDbName).getParameters();
+ assertTrue(Boolean.parseBoolean(targetParams.get("repl.target.for")));
+ assertNull(targetParams.get("repl.source.for"));
+ assertNull(targetParams.get("repl.resume.started"));
+ assertNotNull(targetParams.get("repl.target.last.id"));
+ assertNotNull(targetParams.get("repl.last.id"));
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index e0b58a64493..0c0022c1f49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -548,10 +548,20 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
}
Utils.create(dumpAckFile, conf);
prepareReturnValues(work.getResultValues());
- work.getMetricCollector().reportEnd(isFailoverInProgress ?
Status.FAILOVER_READY : Status.SUCCESS);
+ if (isFailoverInProgress) {
+ work.getMetricCollector().reportEnd(Status.FAILOVER_READY);
+ } else {
+ work.getMetricCollector().reportEnd(isFirstCycleOfResume(database) ?
+ Status.RESUME_READY :
+ Status.SUCCESS);
+ }
deleteAllPreviousDumpMeta(work.getCurrentDumpPath());
}
+ private boolean isFirstCycleOfResume(Database database) {
+ return createEventMarker &&
database.getParameters().containsKey(REPL_RESUME_STARTED_AFTER_FAILOVER);
+ }
+
private void prepareReturnValues(List<String> values) throws
SemanticException {
LOG.debug("prepareReturnValues : " + dumpSchema);
for (String s : values) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 82f30d1f26a..b9fcfbcb426 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -838,7 +838,11 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
}
createReplLoadCompleteAckTask();
work.getMetricCollector().reportStageEnd(STAGE_NAME, Status.SUCCESS);
- work.getMetricCollector().reportEnd(Status.SUCCESS);
+ if
(Boolean.parseBoolean(targetDb.getParameters().get(REPL_RESUME_STARTED_AFTER_FAILOVER)))
{
+ work.getMetricCollector().reportEnd(Status.RESUME_READY);
+ } else {
+ work.getMetricCollector().reportEnd(Status.SUCCESS);
+ }
targetDb.setParameters(params);
getHive().alterDatabase(work.dbNameToLoadIn, targetDb);
return 0;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
index 70a7f1af6d2..3e8964b65c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
@@ -28,5 +28,6 @@ public enum Status {
FAILED_ADMIN,
FAILOVER_IN_PROGRESS,
FAILOVER_READY,
+ RESUME_READY,
SKIPPED
}