This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 497553016d8 HIVE-26624: Set repl.background.enable on target after 
failover completion (Vinit Patni, reviewed by László Pintér, Teddy Choi)
497553016d8 is described below

commit 497553016d804576cdf2262e5869541c71e2efbd
Author: vinitpatni <[email protected]>
AuthorDate: Fri Nov 18 10:22:42 2022 +0530

    HIVE-26624: Set repl.background.enable on target after failover completion 
(Vinit Patni, reviewed by László Pintér, Teddy Choi)
    
    Co-authored-by: vpatni <[email protected]>
---
 .../parse/TestReplicationScenariosAcidTables.java  | 60 ++++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  2 +
 2 files changed, 62 insertions(+)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 7315f3565b3..13bb9ad1c64 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -80,6 +80,7 @@ import java.util.Map;
 
 
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
+import static 
org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUND_THREAD;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.junit.Assert.assertEquals;
@@ -855,6 +856,65 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     
assertFalse(MetaStoreUtils.isDbBeingFailedOver(replica.getDatabase(replicatedDbName)));
   }
 
+  @Test
+  public void testEnablementOfReplBackgroundThreadDuringFailover() throws 
Throwable{
+    List<String> failoverConfigs = Arrays.asList("'" + 
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+
+    WarehouseInstance.Tuple dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) 
tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
+    
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName, failoverConfigs)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    Database db = replica.getDatabase(replicatedDbName);
+    assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(db));
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)")
+            .dump(primaryDbName, failoverConfigs);
+
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path failoverReadyMarker = new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString());
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(failoverReadyMarker));
+    
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
+            MetaStoreUtils.FailoverEndpoint.SOURCE));
+
+    replica.load(replicatedDbName, primaryDbName, failoverConfigs)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
+    db = replica.getDatabase(replicatedDbName);
+    assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, 
MetaStoreUtils.FailoverEndpoint.TARGET));
+    assert 
"true".equals(db.getParameters().get(REPL_ENABLE_BACKGROUND_THREAD));
+  }
+
   @Test
   public void testFailoverFailureDuringRollback() throws Throwable {
     List<String> failoverConfigs = Arrays.asList("'" + 
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'",
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index bcef4fae57f..7be785a97ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -584,6 +584,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
               }
               LOG.info("Setting failover endpoint:{} to TARGET for database: 
{}", ReplConst.REPL_FAILOVER_ENDPOINT, db.getName());
               params.put(ReplConst.REPL_FAILOVER_ENDPOINT, 
MetaStoreUtils.FailoverEndpoint.TARGET.toString());
+              LOG.info("Setting {} for database: {}", 
ReplConst.REPL_ENABLE_BACKGROUND_THREAD, db.getName());
+              params.put(ReplConst.REPL_ENABLE_BACKGROUND_THREAD,"true");
               getHive().alterDatabase(work.getTargetDatabase(), db);
             } catch (HiveException e) {
               throw new SemanticException(e);

Reply via email to