This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 497553016d8 HIVE-26624: Set repl.background.enable on target after
failover completion (Vinit Patni, reviewed by László Pintér, Teddy Choi)
497553016d8 is described below
commit 497553016d804576cdf2262e5869541c71e2efbd
Author: vinitpatni <[email protected]>
AuthorDate: Fri Nov 18 10:22:42 2022 +0530
HIVE-26624: Set repl.background.enable on target after failover completion
(Vinit Patni, reviewed by László Pintér, Teddy Choi)
Co-authored-by: vpatni <[email protected]>
---
.../parse/TestReplicationScenariosAcidTables.java | 60 ++++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 2 +
2 files changed, 62 insertions(+)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 7315f3565b3..13bb9ad1c64 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -80,6 +80,7 @@ import java.util.Map;
import static
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
+import static
org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUND_THREAD;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.junit.Assert.assertEquals;
@@ -855,6 +856,65 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
assertFalse(MetaStoreUtils.isDbBeingFailedOver(replica.getDatabase(replicatedDbName)));
}
+ @Test
+ public void testEnablementOfReplBackgroundThreadDuringFailover() throws
Throwable{
+ List<String> failoverConfigs = Arrays.asList("'" +
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+
+ WarehouseInstance.Tuple dumpData = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .dump(primaryDbName, failoverConfigs);
+
+ FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(dumpData.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
+ assertFalse(fs.exists(new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString())));
+
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+ replica.load(replicatedDbName, primaryDbName, failoverConfigs)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId);
+
+ Database db = replica.getDatabase(replicatedDbName);
+ assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+ assertFalse(MetaStoreUtils.isDbBeingFailedOver(db));
+
+ dumpData = primary.run("use " + primaryDbName)
+ .run("insert into t1 values(1)")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)")
+ .dump(primaryDbName, failoverConfigs);
+
+ dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ Path failoverReadyMarker = new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString());
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ assertTrue(fs.exists(new Path(dumpPath,
FailoverMetaData.FAILOVER_METADATA)));
+ assertTrue(fs.exists(failoverReadyMarker));
+
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
+ MetaStoreUtils.FailoverEndpoint.SOURCE));
+
+ replica.load(replicatedDbName, primaryDbName, failoverConfigs)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId)
+ .run("select id from t1")
+ .verifyResults(new String[]{"1"})
+ .run("select rank from t2 order by rank")
+ .verifyResults(new String[]{"10", "11"});
+
+ assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
+ db = replica.getDatabase(replicatedDbName);
+ assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+ assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db,
MetaStoreUtils.FailoverEndpoint.TARGET));
+ assert
"true".equals(db.getParameters().get(REPL_ENABLE_BACKGROUND_THREAD));
+ }
+
@Test
public void testFailoverFailureDuringRollback() throws Throwable {
List<String> failoverConfigs = Arrays.asList("'" +
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'",
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index bcef4fae57f..7be785a97ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -584,6 +584,8 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
}
LOG.info("Setting failover endpoint:{} to TARGET for database:
{}", ReplConst.REPL_FAILOVER_ENDPOINT, db.getName());
params.put(ReplConst.REPL_FAILOVER_ENDPOINT,
MetaStoreUtils.FailoverEndpoint.TARGET.toString());
+ LOG.info("Setting {} for database: {}",
ReplConst.REPL_ENABLE_BACKGROUND_THREAD, db.getName());
+ params.put(ReplConst.REPL_ENABLE_BACKGROUND_THREAD,"true");
getHive().alterDatabase(work.getTargetDatabase(), db);
} catch (HiveException e) {
throw new SemanticException(e);