[ https://issues.apache.org/jira/browse/HIVE-24946?focusedWorklogId=630740&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-630740 ]
ASF GitHub Bot logged work on HIVE-24946: ----------------------------------------- Author: ASF GitHub Bot Created on: 28/Jul/21 19:58 Start Date: 28/Jul/21 19:58 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #2529: URL: https://github.com/apache/hive/pull/2529#discussion_r678577106 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ########## @@ -155,6 +156,10 @@ LOAD_NEW, LOAD_SKIP, LOAD_REPLACE } + public static enum Failover_Point { Review comment: add javadoc comment for what this class is for ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ########## @@ -155,6 +156,10 @@ LOAD_NEW, LOAD_SKIP, LOAD_REPLACE } + public static enum Failover_Point { Review comment: nit: Rename to FailoverEndpoint ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ########## @@ -219,16 +224,16 @@ return TaskFactory.get(replLogWork, conf); } + public static boolean isDbBeingFailedOverAtSource(Database db) { + assert (db != null); + Map<String, String> dbParameters = db.getParameters(); + return Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); Review comment: db.getParameters() is Nullable, don't you require a null check like you did in other places? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java ########## @@ -172,7 +179,7 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, IncrementalLo Map<String, String> dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps, dumpDirectory, metricCollector); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps, dumpDirectory, metricCollector, shouldFailover); Review comment: nit: format ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java ########## @@ -655,17 +679,25 @@ private int executeIncrementalLoad(long loadStartTime) throws Exception { if (work.replScopeModified) { dropTablesExcludedInReplScope(work.currentReplScope); } - if (!MetaStoreUtils.isTargetOfReplication(getHive().getDatabase(work.dbNameToLoadIn))) { + if (!work.shouldFailover()) { + Database targetDb = getHive().getDatabase(work.dbNameToLoadIn); Map<String, String> props = new HashMap<>(); - props.put(ReplConst.TARGET_OF_REPLICATION, "true"); - AlterDatabaseSetPropertiesDesc setTargetDesc = new AlterDatabaseSetPropertiesDesc(work.dbNameToLoadIn, props, null); - Task<?> addReplTargetPropTask = - TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), setTargetDesc, true, - work.dumpDirectory, work.getMetricCollector()), conf); - if (this.childTasks == null) { - this.childTasks = new ArrayList<>(); + if (!MetaStoreUtils.isTargetOfReplication(targetDb)) { + props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE); + } + if (ReplUtils.isDbBeingFailedOverAtTarget(targetDb)) { + props.put(ReplConst.REPL_FAILOVER_ENABLED, ""); Review comment: Is this a rollback use case? ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -204,10 +207,11 @@ private void testTargetDbReplIncompatible(boolean setReplIncompProp) throws Thro } @Test - public void testFailoverDuringDump() throws Throwable { + public void testCompleteFailoverWithReverseBootstrap() throws Throwable { HiveConf primaryConf = primary.getConf(); TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'"); + Database db; Review comment: nit:Move this down to where you need it ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ########## @@ -219,16 +224,16 @@ return TaskFactory.get(replLogWork, conf); } + public static boolean isDbBeingFailedOverAtSource(Database db) { + assert (db != null); + Map<String, String> dbParameters = db.getParameters(); + return Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); Review comment: Also, ReplConst.REPL_FAILOVER_ENABLED -> ReplConst.REPL_FAILOVER_ENDPOINT Does this make sense? ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -308,6 +316,9 @@ public void testFailoverDuringDump() throws Throwable { .run("select rank from t2 order by rank") .verifyResults(new String[]{"10", "11"}); + db = replica.getDatabase(replicatedDbName); + assertFalse(MetaStoreUtils.isTargetOfReplication(db)); Review comment: Wouldn't this mean that there will not be any rollback? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ########## @@ -219,16 +224,16 @@ return TaskFactory.get(replLogWork, conf); } + public static boolean isDbBeingFailedOverAtSource(Database db) { + assert (db != null); Review comment: What do you gain by this assertion that would result in Error, rather NPE will be thrown ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java ########## @@ -129,16 +130,20 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, if (metricCollector != null) { metricCollector.setMetricsMBean(name); } + Path failoverReadyMarker = new Path(dumpDirectory, ReplAck.FAILOVER_READY_MARKER.toString()); + FileSystem fs = failoverReadyMarker.getFileSystem(hiveConf); + shouldFailover = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START) + && fs.exists(failoverReadyMarker); incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory, new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo, metricCollector, - replStatsTracker); + replStatsTracker, shouldFailover); /* * If the current incremental dump also includes bootstrap for some tables, then create iterator * for the same. */ Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); - FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); + fs = incBootstrapDir.getFileSystem(hiveConf); Review comment: Why do you need this? You already have fs ########## File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java ########## @@ -249,7 +249,7 @@ public static boolean isDbReplIncompatible(Database db) { public static boolean isDbBeingFailedOver(Database db) { assert (db != null); Map<String, String> dbParameters = db.getParameters(); - return dbParameters != null && ReplConst.TRUE.equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); + return dbParameters != null && !StringUtils.isEmpty(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); Review comment: It can't be any random string, it has to be a valid Failover_Point (i.e FailoverEndpoint as per the other comment) ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java ########## @@ -554,6 +555,29 @@ public void run() throws SemanticException { } } }); + if (work.shouldFailover()) { + listOfPreAckTasks.add(new PreAckTask() { + @Override + public void run() throws SemanticException { + try { + Database db = getHive().getDatabase(work.getTargetDatabase()); + Map<String, String> params = db.getParameters(); + if (params == null) { + params = new HashMap<>(); + db.setParameters(params); + } else if (MetaStoreUtils.isTargetOfReplication(db)) { + params.remove(ReplConst.TARGET_OF_REPLICATION); Review comment: Will this not break the rollback logic? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ########## @@ -219,16 +224,16 @@ return TaskFactory.get(replLogWork, conf); } + public static boolean isDbBeingFailedOverAtSource(Database db) { + assert (db != null); Review comment: Does this hold good for 'REPL DUMP *' use case? ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -319,31 +330,92 @@ public void testFailoverDuringDump() throws Throwable { assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString()))); assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName))); - dumpData = primary.dump(primaryDbName); - dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - Assert.assertEquals(new DumpMetaData(dumpPath, conf).getDumpType(), DumpType.INCREMENTAL); - Path failoverReadyFile = new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()); - Path failoverMdFile = new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA); - assertFalse(fs.exists(failoverReadyFile)); - assertFalse(fs.exists(failoverMdFile)); - assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName))); - replica.load(replicatedDbName, primaryDbName); - fs.create(failoverReadyFile); - fs.create(failoverMdFile); - assertTrue(fs.exists(failoverReadyFile)); - assertTrue(fs.exists(failoverMdFile)); + primary.run("drop database if exists " + primaryDbName + " cascade"); - //Since the failover start config is disabled and previous valid dump directory contains _failover_ready marker file - //So, this dump iteration will perform bootstrap dump instead of incremental and last dump directory also should not - //deleted. - WarehouseInstance.Tuple newDumpData = primary.dump(primaryDbName); - assertNotEquals(newDumpData.dumpLocation, dumpData.dumpLocation); + assertTrue(primary.getDatabase(primaryDbName) == null); + + assertTrue(ReplChangeManager.getReplPolicyIdString(replica.getDatabase(replicatedDbName)) == null); + WarehouseInstance.Tuple reverseDumpData = replica.dump(replicatedDbName); + assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation); assertTrue(fs.exists(dumpPath)); assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()))); - dumpPath = new Path(newDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA))); assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(ReplUtils.isDbBeingFailedOverAtTarget(replica.getDatabase(replicatedDbName))); Review comment: Assert at this point that the _dumpmetadat has marked the dump as bootstrap one ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java ########## @@ -80,9 +79,11 @@ public ReplLogger getReplLogger() { private final Long eventTo; private String dumpDirectory; private final ReplicationMetricCollector metricCollector; + private boolean shouldFailover; public IncrementalLoadTasksBuilder(String dbName, String loadPath, IncrementalLoadEventsIterator iterator, - HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector, ReplStatsTracker replStatsTracker) + HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector, ReplStatsTracker replStatsTracker, Review comment: nit: format this ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java ########## @@ -115,19 +116,13 @@ public ReplStateLogWork(ReplLogger replLogger, String functionName, String dumpD this.metricCollector = metricCollector; } - public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps, ReplicationMetricCollector collector) { - this.logType = LOG_TYPE.END; - this.replLogger = replLogger; - this.lastReplId = ReplicationSpec.getLastReplicatedStateFromParameters(dbProps); - this.metricCollector = collector; - } - - public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps, String dumpDirectory, ReplicationMetricCollector collector) { + public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps, String dumpDirectory, ReplicationMetricCollector collector, boolean shouldFailover) { Review comment: nit: format this ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ########## @@ -219,16 +224,16 @@ return TaskFactory.get(replLogWork, conf); } + public static boolean isDbBeingFailedOverAtSource(Database db) { + assert (db != null); + Map<String, String> dbParameters = db.getParameters(); + return Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); Review comment: May be we can just have a single method like this? public static boolean isDbBeingFailedOverAtEndpoint(Database db, Failover_Point endPoint) { assert (db != null); Map<String, String> dbParameters = db.getParameters(); return endPoint.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED)); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 630740) Time Spent: 40m (was: 0.5h) > Handle failover case during Repl Load > ------------------------------------- > > Key: HIVE-24946 > URL: https://issues.apache.org/jira/browse/HIVE-24946 > Project: Hive > Issue Type: New Feature > Reporter: Haymant Mangla > Assignee: Haymant Mangla > Priority: Major > Labels: pull-request-available > Time Spent: 40m > Remaining Estimate: 0h > > * Update metric during load to capture the readiness for failover > * Remove repl.target.for property on target cluster > * Prepare the dump directory to be used during failover first dump operation -- This message was sent by Atlassian Jira (v8.3.4#803005)