[ 
https://issues.apache.org/jira/browse/HIVE-24946?focusedWorklogId=630740&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-630740
 ]

ASF GitHub Bot logged work on HIVE-24946:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jul/21 19:58
            Start Date: 28/Jul/21 19:58
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #2529:
URL: https://github.com/apache/hive/pull/2529#discussion_r678577106



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -155,6 +156,10 @@
     LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
   }
 
+  public static enum Failover_Point {

Review comment:
       add javadoc comment for what this class is for

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -155,6 +156,10 @@
     LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
   }
 
+  public static enum Failover_Point {

Review comment:
       nit: Rename to FailoverEndpoint

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
     return TaskFactory.get(replLogWork, conf);
   }
 
+  public static boolean isDbBeingFailedOverAtSource(Database db) {
+    assert (db != null);
+    Map<String, String> dbParameters = db.getParameters();
+    return 
Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));

Review comment:
       db.getParameters() is Nullable, don't you require a null check like you 
did in other places?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
##########
@@ -172,7 +179,7 @@ public IncrementalLoadTasksBuilder(String dbName, String 
loadPath, IncrementalLo
 
       Map<String, String> dbProps = new HashMap<>();
       dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), 
String.valueOf(lastReplayedEvent));
-      ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, 
dbProps, dumpDirectory, metricCollector);
+      ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, 
dbProps, dumpDirectory, metricCollector, shouldFailover);

Review comment:
       nit: format

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -655,17 +679,25 @@ private int executeIncrementalLoad(long loadStartTime) 
throws Exception {
     if (work.replScopeModified) {
       dropTablesExcludedInReplScope(work.currentReplScope);
     }
-    if 
(!MetaStoreUtils.isTargetOfReplication(getHive().getDatabase(work.dbNameToLoadIn)))
 {
+    if (!work.shouldFailover()) {
+      Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
       Map<String, String> props = new HashMap<>();
-      props.put(ReplConst.TARGET_OF_REPLICATION, "true");
-      AlterDatabaseSetPropertiesDesc setTargetDesc = new 
AlterDatabaseSetPropertiesDesc(work.dbNameToLoadIn, props, null);
-      Task<?> addReplTargetPropTask =
-              TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), 
setTargetDesc, true,
-                      work.dumpDirectory, work.getMetricCollector()), conf);
-      if (this.childTasks == null) {
-        this.childTasks = new ArrayList<>();
+      if (!MetaStoreUtils.isTargetOfReplication(targetDb)) {
+        props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE);
+      }
+      if (ReplUtils.isDbBeingFailedOverAtTarget(targetDb)) {
+        props.put(ReplConst.REPL_FAILOVER_ENABLED, "");

Review comment:
       Is this a rollback use case?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -204,10 +207,11 @@ private void testTargetDbReplIncompatible(boolean 
setReplIncompProp) throws Thro
   }
 
   @Test
-  public void testFailoverDuringDump() throws Throwable {
+  public void testCompleteFailoverWithReverseBootstrap() throws Throwable {
     HiveConf primaryConf = primary.getConf();
     TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
     List<String> failoverConfigs = Arrays.asList("'" + 
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    Database db;

Review comment:
       nit:Move this down to where you need it

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
     return TaskFactory.get(replLogWork, conf);
   }
 
+  public static boolean isDbBeingFailedOverAtSource(Database db) {
+    assert (db != null);
+    Map<String, String> dbParameters = db.getParameters();
+    return 
Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));

Review comment:
       Also, ReplConst.REPL_FAILOVER_ENABLED -> ReplConst.REPL_FAILOVER_ENDPOINT
   Does this make sense?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -308,6 +316,9 @@ public void testFailoverDuringDump() throws Throwable {
             .run("select rank from t2 order by rank")
             .verifyResults(new String[]{"10", "11"});
 
+    db = replica.getDatabase(replicatedDbName);
+    assertFalse(MetaStoreUtils.isTargetOfReplication(db));

Review comment:
       Wouldn't this mean that there will not be any rollback?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
     return TaskFactory.get(replLogWork, conf);
   }
 
+  public static boolean isDbBeingFailedOverAtSource(Database db) {
+    assert (db != null);

Review comment:
       What do you gain by this assertion that would result in Error, rather 
NPE will be thrown

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
##########
@@ -129,16 +130,20 @@ public ReplLoadWork(HiveConf hiveConf, String 
dumpDirectory,
       if (metricCollector != null) {
         metricCollector.setMetricsMBean(name);
       }
+      Path failoverReadyMarker = new Path(dumpDirectory, 
ReplAck.FAILOVER_READY_MARKER.toString());
+      FileSystem fs = failoverReadyMarker.getFileSystem(hiveConf);
+      shouldFailover = 
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)
+              && fs.exists(failoverReadyMarker);
       incrementalLoadTasksBuilder = new 
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
           new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), 
hiveConf, eventTo, metricCollector,
-          replStatsTracker);
+          replStatsTracker, shouldFailover);
 
       /*
        * If the current incremental dump also includes bootstrap for some 
tables, then create iterator
        * for the same.
        */
       Path incBootstrapDir = new Path(dumpDirectory, 
ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
-      FileSystem fs = incBootstrapDir.getFileSystem(hiveConf);
+      fs = incBootstrapDir.getFileSystem(hiveConf);

Review comment:
       Why do you need this? You already have fs

##########
File path: 
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
##########
@@ -249,7 +249,7 @@ public static boolean isDbReplIncompatible(Database db) {
   public static boolean isDbBeingFailedOver(Database db) {
     assert (db != null);
     Map<String, String> dbParameters = db.getParameters();
-    return dbParameters != null && 
ReplConst.TRUE.equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
+    return dbParameters != null && 
!StringUtils.isEmpty(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));

Review comment:
       It can't be any random string, it has to be a valid Failover_Point (i.e 
FailoverEndpoint as per the other comment)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -554,6 +555,29 @@ public void run() throws SemanticException {
           }
         }
       });
+      if (work.shouldFailover()) {
+        listOfPreAckTasks.add(new PreAckTask() {
+          @Override
+          public void run() throws SemanticException {
+            try {
+              Database db = getHive().getDatabase(work.getTargetDatabase());
+              Map<String, String> params = db.getParameters();
+              if (params == null) {
+                params = new HashMap<>();
+                db.setParameters(params);
+              } else if (MetaStoreUtils.isTargetOfReplication(db)) {
+                params.remove(ReplConst.TARGET_OF_REPLICATION);

Review comment:
       Will this not break the rollback logic?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
     return TaskFactory.get(replLogWork, conf);
   }
 
+  public static boolean isDbBeingFailedOverAtSource(Database db) {
+    assert (db != null);

Review comment:
       Does this hold good for 'REPL DUMP *' use case?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -319,31 +330,92 @@ public void testFailoverDuringDump() throws Throwable {
     assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
 
     
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
-    dumpData = primary.dump(primaryDbName);
-    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    Assert.assertEquals(new DumpMetaData(dumpPath, conf).getDumpType(), 
DumpType.INCREMENTAL);
-    Path failoverReadyFile = new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString());
-    Path failoverMdFile = new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA);
-    assertFalse(fs.exists(failoverReadyFile));
-    assertFalse(fs.exists(failoverMdFile));
-    
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
-    replica.load(replicatedDbName, primaryDbName);
 
-    fs.create(failoverReadyFile);
-    fs.create(failoverMdFile);
-    assertTrue(fs.exists(failoverReadyFile));
-    assertTrue(fs.exists(failoverMdFile));
+    primary.run("drop database if exists " + primaryDbName + " cascade");
 
-    //Since the failover start config is disabled and previous valid dump 
directory contains _failover_ready marker file
-    //So, this dump iteration will perform bootstrap dump instead of 
incremental and last dump directory also should not
-    //deleted.
-    WarehouseInstance.Tuple newDumpData = primary.dump(primaryDbName);
-    assertNotEquals(newDumpData.dumpLocation, dumpData.dumpLocation);
+    assertTrue(primary.getDatabase(primaryDbName) == null);
+
+    
assertTrue(ReplChangeManager.getReplPolicyIdString(replica.getDatabase(replicatedDbName))
 == null);
+    WarehouseInstance.Tuple reverseDumpData = replica.dump(replicatedDbName);
+    assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
     assertTrue(fs.exists(dumpPath));
     assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
-    dumpPath = new Path(newDumpData.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    dumpPath = new Path(reverseDumpData.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
     assertFalse(fs.exists(new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA)));
     assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == 
DumpType.BOOTSTRAP);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    
assertTrue(ReplUtils.isDbBeingFailedOverAtTarget(replica.getDatabase(replicatedDbName)));

Review comment:
       Assert at this point that the _dumpmetadat has marked the dump as 
bootstrap one 

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
##########
@@ -80,9 +79,11 @@ public ReplLogger getReplLogger() {
   private final Long eventTo;
   private String dumpDirectory;
   private final ReplicationMetricCollector metricCollector;
+  private boolean shouldFailover;
 
   public IncrementalLoadTasksBuilder(String dbName, String loadPath, 
IncrementalLoadEventsIterator iterator,
-      HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector, 
ReplStatsTracker replStatsTracker)
+      HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector, 
ReplStatsTracker replStatsTracker,

Review comment:
       nit: format this

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java
##########
@@ -115,19 +116,13 @@ public ReplStateLogWork(ReplLogger replLogger, String 
functionName, String dumpD
     this.metricCollector = metricCollector;
   }
 
-  public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps, 
ReplicationMetricCollector collector) {
-    this.logType = LOG_TYPE.END;
-    this.replLogger = replLogger;
-    this.lastReplId = 
ReplicationSpec.getLastReplicatedStateFromParameters(dbProps);
-    this.metricCollector = collector;
-  }
-
-  public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps, 
String dumpDirectory, ReplicationMetricCollector collector) {
+  public ReplStateLogWork(ReplLogger replLogger, Map<String, String> dbProps, 
String dumpDirectory, ReplicationMetricCollector collector, boolean 
shouldFailover) {

Review comment:
       nit: format this

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -219,16 +224,16 @@
     return TaskFactory.get(replLogWork, conf);
   }
 
+  public static boolean isDbBeingFailedOverAtSource(Database db) {
+    assert (db != null);
+    Map<String, String> dbParameters = db.getParameters();
+    return 
Failover_Point.SOURCE.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));

Review comment:
       May be we can just have a single method like this?
   public static boolean isDbBeingFailedOverAtEndpoint(Database db, 
Failover_Point endPoint) {
       assert (db != null);
       Map<String, String> dbParameters = db.getParameters();
       return 
endPoint.toString().equalsIgnoreCase(dbParameters.get(ReplConst.REPL_FAILOVER_ENABLED));
     }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 630740)
    Time Spent: 40m  (was: 0.5h)

> Handle failover case during Repl Load
> -------------------------------------
>
>                 Key: HIVE-24946
>                 URL: https://issues.apache.org/jira/browse/HIVE-24946
>             Project: Hive
>          Issue Type: New Feature
>            Reporter: Haymant Mangla
>            Assignee: Haymant Mangla
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> * Update metric during load to capture the readiness for failover
>  * Remove repl.target.for property on target cluster
>  * Prepare the dump directory to be used during failover first dump operation



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to