This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d0fcea3adb HIVE-26962 : Expose resume/reset ready state through 
replication metrics when first cycle of resume/reset completes (#4016) 
(Shreenidhi Saigaonkar, reviewed by Teddy Choi)
1d0fcea3adb is described below

commit 1d0fcea3adbec9ea9ee3f999b080a6f98ed89264
Author: shreenidhiSaigaonkar 
<[email protected]>
AuthorDate: Fri Feb 24 06:16:42 2023 +0530

    HIVE-26962 : Expose resume/reset ready state through replication metrics 
when first cycle of resume/reset completes (#4016) (Shreenidhi Saigaonkar, 
reviewed by Teddy Choi)
---
 .../parse/TestReplicationScenariosAcidTables.java  | 88 +++++++++++++++++++++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 12 ++-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  6 +-
 .../hive/ql/parse/repl/metric/event/Status.java    |  1 +
 4 files changed, 104 insertions(+), 3 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 0dfb07f2282..5c69443e73f 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -59,6 +59,7 @@ import 
org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage;
+import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
@@ -71,7 +72,7 @@ import org.junit.Test;
 import org.junit.BeforeClass;
 
 import javax.annotation.Nullable;
-
+import java.util.concurrent.TimeUnit;
 import java.io.File;
 import java.io.IOException;
 import java.io.BufferedReader;
@@ -103,6 +104,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables.
  */
@@ -3797,4 +3800,87 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     // ensure event count is captured appropriately in EventsDumpMetadata.
     assertEquals(eventsCountInAckFile, eventCountFromStagingDir);
   }
+
+  @Test
+  public void testResumeWorkFlow() throws Throwable {
+    isMetricsEnabledForTests(true);
+
+    MetricCollector.getInstance().getMetrics().clear();
+
+    // Do bootstrap
+    primary.run("use " + primaryDbName)
+      .run("create table tb1(id int)")
+      .run("insert into tb1 values(10)")
+      .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName);
+
+    // incremental
+    primary.run("use " + primaryDbName)
+      .run("insert into tb1 values(20)")
+      .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName);
+
+    // suppose this is the point of failover
+    List<String> failoverConfigs = Arrays.asList("'" + 
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    primary.dump(primaryDbName, failoverConfigs);
+    replica.load(replicatedDbName, primaryDbName, failoverConfigs);
+
+    // let's modify replica/target after failover
+    replica.run("use " + replicatedDbName)
+      .run("insert into tb1 values(30),(40)")
+      .run("create table tb2(id int)")
+      .run("insert into tb2 values(10),(20)");
+
+    // orchestrator will do the swapping and setting correct db params
+    Map<String, String> dbParams = 
replica.getDatabase(replicatedDbName).getParameters();
+    String lastId = dbParams.get("repl.last.id");
+    String targetLastId = dbParams.get("repl.target.last.id");
+
+    primary.run("alter database " + primaryDbName
+      + " set dbproperties('repl.resume.started'='true', 'repl.source.for'='', 
'repl.target" +
+      ".for'='true', 'repl.last.id'='" + targetLastId + "' 
,'repl.target.last.id'='" + lastId +
+      "')");
+
+    replica.run("alter database " + replicatedDbName
+      + " set dbproperties('repl.target.for'='', 
'repl.source.for'='p1','repl.resume" +
+      ".started'='true')");
+
+    // initiate RESET (1st cycle of optimised bootstrap)
+    primary.dump(primaryDbName);
+
+    MetricCollector collector = MetricCollector.getInstance();
+    ReplicationMetric metric = collector.getMetrics().getLast();
+    assertEquals(Status.RESUME_READY, metric.getProgress().getStatus());
+
+    replica.load(replicatedDbName, primaryDbName);
+    metric = collector.getMetrics().getLast();
+    assertEquals(Status.RESUME_READY, metric.getProgress().getStatus());
+
+    // this will be completion cycle for RESET
+    primary.dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName);
+
+    // AFTER RESET : 1. New table got dropped
+    //               2. Changes made on existing table got discarded.
+    replica.run("use "+ replicatedDbName)
+           .run("select id from tb1")
+           .verifyResults(new String[]{"10", "20"})
+           .run("show tables in " + replicatedDbName)
+           .verifyResults(new String[]{"tb1"});
+
+    // verify that the db params got reset after RESUME
+    Map<String, String> srcParams = 
primary.getDatabase(primaryDbName).getParameters();
+    assertNotNull(srcParams.get("repl.source.for"));
+    assertNull(srcParams.get("repl.target.for"));
+    assertNull(srcParams.get("repl.resume.started"));
+    assertNull(srcParams.get("repl.target.last.id"));
+    assertNull(srcParams.get("repl.last.id"));
+
+    Map<String, String> targetParams = 
replica.getDatabase(replicatedDbName).getParameters();
+    assertTrue(Boolean.parseBoolean(targetParams.get("repl.target.for")));
+    assertNull(targetParams.get("repl.source.for"));
+    assertNull(targetParams.get("repl.resume.started"));
+    assertNotNull(targetParams.get("repl.target.last.id"));
+    assertNotNull(targetParams.get("repl.last.id"));
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index e0b58a64493..0c0022c1f49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -548,10 +548,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     }
     Utils.create(dumpAckFile, conf);
     prepareReturnValues(work.getResultValues());
-    work.getMetricCollector().reportEnd(isFailoverInProgress ? 
Status.FAILOVER_READY : Status.SUCCESS);
+    if (isFailoverInProgress) {
+      work.getMetricCollector().reportEnd(Status.FAILOVER_READY);
+    } else {
+      work.getMetricCollector().reportEnd(isFirstCycleOfResume(database) ?
+                                          Status.RESUME_READY :
+                                          Status.SUCCESS);
+    }
     deleteAllPreviousDumpMeta(work.getCurrentDumpPath());
   }
 
+  private boolean isFirstCycleOfResume(Database database) {
+    return createEventMarker && 
database.getParameters().containsKey(REPL_RESUME_STARTED_AFTER_FAILOVER);
+  }
+
   private void prepareReturnValues(List<String> values) throws 
SemanticException {
     LOG.debug("prepareReturnValues : " + dumpSchema);
     for (String s : values) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 82f30d1f26a..b9fcfbcb426 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -838,7 +838,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
       }
       createReplLoadCompleteAckTask();
       work.getMetricCollector().reportStageEnd(STAGE_NAME, Status.SUCCESS);
-      work.getMetricCollector().reportEnd(Status.SUCCESS);
+      if 
(Boolean.parseBoolean(targetDb.getParameters().get(REPL_RESUME_STARTED_AFTER_FAILOVER)))
 {
+        work.getMetricCollector().reportEnd(Status.RESUME_READY);
+      } else {
+        work.getMetricCollector().reportEnd(Status.SUCCESS);
+      }
       targetDb.setParameters(params);
       getHive().alterDatabase(work.dbNameToLoadIn, targetDb);
       return 0;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
index 70a7f1af6d2..3e8964b65c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java
@@ -28,5 +28,6 @@ public enum Status {
   FAILED_ADMIN,
   FAILOVER_IN_PROGRESS,
   FAILOVER_READY,
+  RESUME_READY,
   SKIPPED
 }

Reply via email to