pkumarsinha commented on a change in pull request #2529:
URL: https://github.com/apache/hive/pull/2529#discussion_r678577106
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -155,6 +156,10 @@
LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
}
+ public static enum Failover_Point {
Review comment:
add javadoc comment for what this class is for
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -155,6 +156,10 @@
LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
}
+ public static enum Failover_Point {
Review comment:
nit: Rename to FailoverEndpoint
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
return TaskFactory.get(replLogWork, conf);
}
+ public static boolean isDbBeingFailedOverAtSource(Database db) {
+ assert (db != null);
+ Map<String, String> dbParameters = db.getParameters();
+ return
Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
Review comment:
db.getParameters() is Nullable, don't you require a null check like you
did in other places?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
##########
@@ -172,7 +179,7 @@ public IncrementalLoadTasksBuilder(String dbName, String
loadPath, IncrementalLo
Map<String, String> dbProps = new HashMap<>();
dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
String.valueOf(lastReplayedEvent));
- ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
dbProps, dumpDirectory, metricCollector);
+ ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
dbProps, dumpDirectory, metricCollector, shouldFailover);
Review comment:
nit: format
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -655,17 +679,25 @@ private int executeIncrementalLoad(long loadStartTime)
throws Exception {
if (work.replScopeModified) {
dropTablesExcludedInReplScope(work.currentReplScope);
}
- if
(!MetaStoreUtils.isTargetOfReplication(getHive().getDatabase(work.dbNameToLoadIn)))
{
+ if (!work.shouldFailover()) {
+ Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
Map<String, String> props = new HashMap<>();
- props.put(ReplConst.TARGET_OF_REPLICATION, "true");
- AlterDatabaseSetPropertiesDesc setTargetDesc = new
AlterDatabaseSetPropertiesDesc(work.dbNameToLoadIn, props, null);
- Task<?> addReplTargetPropTask =
- TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(),
setTargetDesc, true,
- work.dumpDirectory, work.getMetricCollector()), conf);
- if (this.childTasks == null) {
- this.childTasks = new ArrayList<>();
+ if (!MetaStoreUtils.isTargetOfReplication(targetDb)) {
+ props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE);
+ }
+ if (ReplUtils.isDbBeingFailedOverAtTarget(targetDb)) {
+ props.put(ReplConst.REPL_FAILOVER_ENABLED, "");
Review comment:
Is this a rollback use case?
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -204,10 +207,11 @@ private void testTargetDbReplIncompatible(boolean
setReplIncompProp) throws Thro
}
@Test
- public void testFailoverDuringDump() throws Throwable {
+ public void testCompleteFailoverWithReverseBootstrap() throws Throwable {
HiveConf primaryConf = primary.getConf();
TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
List<String> failoverConfigs = Arrays.asList("'" +
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+ Database db;
Review comment:
nit:Move this down to where you need it
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
return TaskFactory.get(replLogWork, conf);
}
+ public static boolean isDbBeingFailedOverAtSource(Database db) {
+ assert (db != null);
+ Map<String, String> dbParameters = db.getParameters();
+ return
Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
Review comment:
Also, ReplConst.REPL_FAILOVER_ENABLED -> ReplConst.REPL_FAILOVER_ENDPOINT
Does this make sense?
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -308,6 +316,9 @@ public void testFailoverDuringDump() throws Throwable {
.run("select rank from t2 order by rank")
.verifyResults(new String[]{"10", "11"});
+ db = replica.getDatabase(replicatedDbName);
+ assertFalse(MetaStoreUtils.isTargetOfReplication(db));
Review comment:
Wouldn't this mean that there will not be any rollback?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
return TaskFactory.get(replLogWork, conf);
}
+ public static boolean isDbBeingFailedOverAtSource(Database db) {
+ assert (db != null);
Review comment:
What do you gain by this assertion that would result in Error, rather
NPE will be thrown
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
##########
@@ -129,16 +130,20 @@ public ReplLoadWork(HiveConf hiveConf, String
dumpDirectory,
if (metricCollector != null) {
metricCollector.setMetricsMBean(name);
}
+ Path failoverReadyMarker = new Path(dumpDirectory,
ReplAck.FAILOVER_READY_MARKER.toString());
+ FileSystem fs = failoverReadyMarker.getFileSystem(hiveConf);
+ shouldFailover =
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)
+ && fs.exists(failoverReadyMarker);
incrementalLoadTasksBuilder = new
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
new IncrementalLoadEventsIterator(dumpDirectory, hiveConf),
hiveConf, eventTo, metricCollector,
- replStatsTracker);
+ replStatsTracker, shouldFailover);
/*
* If the current incremental dump also includes bootstrap for some
tables, then create iterator
* for the same.
*/
Path incBootstrapDir = new Path(dumpDirectory,
ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
- FileSystem fs = incBootstrapDir.getFileSystem(hiveConf);
+ fs = incBootstrapDir.getFileSystem(hiveConf);
Review comment:
Why do you need this? You already have fs
##########
File path:
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
##########
@@ -249,7 +249,7 @@ public static boolean isDbReplIncompatible(Database db) {
public static boolean isDbBeingFailedOver(Database db) {
assert (db != null);
Map<String, String> dbParameters = db.getParameters();
- return dbParameters != null &&
ReplConst.TRUE.equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
+ return dbParameters != null &&
!StringUtils.isEmpty(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
Review comment:
It can't be any random string, it has to be a valid Failover_Point (i.e
FailoverEndpoint as per the other comment)
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -554,6 +555,29 @@ public void run() throws SemanticException {
}
}
});
+ if (work.shouldFailover()) {
+ listOfPreAckTasks.add(new PreAckTask() {
+ @Override
+ public void run() throws SemanticException {
+ try {
+ Database db = getHive().getDatabase(work.getTargetDatabase());
+ Map<String, String> params = db.getParameters();
+ if (params == null) {
+ params = new HashMap<>();
+ db.setParameters(params);
+ } else if (MetaStoreUtils.isTargetOfReplication(db)) {
+ params.remove(ReplConst.TARGET_OF_REPLICATION);
Review comment:
Will this not break the rollback logic?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
return TaskFactory.get(replLogWork, conf);
}
+ public static boolean isDbBeingFailedOverAtSource(Database db) {
+ assert (db != null);
Review comment:
Does this hold good for 'REPL DUMP *' use case?
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -319,31 +330,92 @@ public void testFailoverDuringDump() throws Throwable {
assertTrue(fs.exists(new Path(dumpPath,
ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
- dumpData = primary.dump(primaryDbName);
- dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
- Assert.assertEquals(new DumpMetaData(dumpPath, conf).getDumpType(),
DumpType.INCREMENTAL);
- Path failoverReadyFile = new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString());
- Path failoverMdFile = new Path(dumpPath,
FailoverMetaData.FAILOVER_METADATA);
- assertFalse(fs.exists(failoverReadyFile));
- assertFalse(fs.exists(failoverMdFile));
-
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
- replica.load(replicatedDbName, primaryDbName);
- fs.create(failoverReadyFile);
- fs.create(failoverMdFile);
- assertTrue(fs.exists(failoverReadyFile));
- assertTrue(fs.exists(failoverMdFile));
+ primary.run("drop database if exists " + primaryDbName + " cascade");
- //Since the failover start config is disabled and previous valid dump
directory contains _failover_ready marker file
- //So, this dump iteration will perform bootstrap dump instead of
incremental and last dump directory also should not
- //deleted.
- WarehouseInstance.Tuple newDumpData = primary.dump(primaryDbName);
- assertNotEquals(newDumpData.dumpLocation, dumpData.dumpLocation);
+ assertTrue(primary.getDatabase(primaryDbName) == null);
+
+
assertTrue(ReplChangeManager.getReplPolicyIdString(replica.getDatabase(replicatedDbName))
== null);
+ WarehouseInstance.Tuple reverseDumpData = replica.dump(replicatedDbName);
+ assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
assertTrue(fs.exists(dumpPath));
assertTrue(fs.exists(new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString())));
- dumpPath = new Path(newDumpData.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
+ dumpPath = new Path(reverseDumpData.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(fs.exists(new Path(dumpPath,
FailoverMetaData.FAILOVER_METADATA)));
assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() ==
DumpType.BOOTSTRAP);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+
assertTrue(ReplUtils.isDbBeingFailedOverAtTarget(replica.getDatabase(replicatedDbName)));
Review comment:
Assert at this point that the _dumpmetadat has marked the dump as
bootstrap one
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
##########
@@ -80,9 +79,11 @@ public ReplLogger getReplLogger() {
private final Long eventTo;
private String dumpDirectory;
private final ReplicationMetricCollector metricCollector;
+ private boolean shouldFailover;
public IncrementalLoadTasksBuilder(String dbName, String loadPath,
IncrementalLoadEventsIterator iterator,
- HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector,
ReplStatsTracker replStatsTracker)
+ HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector,
ReplStatsTracker replStatsTracker,
Review comment:
nit: format this
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java
##########
@@ -115,19 +116,13 @@ public ReplStateLogWork(ReplLogger replLogger, String
functionName, String dumpD
this.metricCollector = metricCollector;
}
- public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps,
ReplicationMetricCollector collector) {
- this.logType = LOG_TYPE.END;
- this.replLogger = replLogger;
- this.lastReplId =
ReplicationSpec.getLastReplicatedStateFromParameters(dbProps);
- this.metricCollector = collector;
- }
-
- public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps,
String dumpDirectory, ReplicationMetricCollector collector) {
+ public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps,
String dumpDirectory, ReplicationMetricCollector collector, boolean
shouldFailover) {
Review comment:
nit: format this
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
return TaskFactory.get(replLogWork, conf);
}
+ public static boolean isDbBeingFailedOverAtSource(Database db) {
+ assert (db != null);
+ Map<String, String> dbParameters = db.getParameters();
+ return
Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
Review comment:
May be we can just have a single method like this?
public static boolean isDbBeingFailedOverAtEndpoint(Database db,
Failover_Point endPoint) {
assert (db != null);
Map<String, String> dbParameters = db.getParameters();
return
endPoint.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]