pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r584438654



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -162,6 +164,37 @@ public void testAcidTablesBootstrap() throws Throwable {
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
+  @Test
+  public void testNotificationFromLoadMetadataAck() throws Throwable{
+    long previousLoadNotificationID = 0, currentLoadNotificationID, 
currentNotificationID;
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .verifyResults(new String[] {});
+    currentLoadNotificationID = fetchNotificationIDFromDump(new 
Path(bootstrapDump.dumpLocation));
+    currentNotificationID = 
replica.getCurrentNotificationEventId().getEventId();
+    assertTrue(currentLoadNotificationID > previousLoadNotificationID && 
currentNotificationID > currentLoadNotificationID);
+    previousLoadNotificationID = currentLoadNotificationID;
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("insert into t1 
values (1)")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .verifyResults(new String[] {});
+    currentLoadNotificationID = fetchNotificationIDFromDump(new 
Path(incrementalDump1.dumpLocation));
+    currentNotificationID = 
replica.getCurrentNotificationEventId().getEventId();
+    assertTrue(currentLoadNotificationID > previousLoadNotificationID && 
currentNotificationID > currentLoadNotificationID);
+  }
+
+  private long fetchNotificationIDFromDump(Path dumpLocation) throws Exception{
+    Path loadMetadataFilePath = new Path(dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplAck.LOAD_METADATA);

Review comment:
       Use File.separator in stead of /

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
##########
@@ -43,6 +43,9 @@
   @Override
   public int execute() {
     try {
+      for( preAckTask task : work.getPreAckTasks() ){

Review comment:
       Format line:   for( preAckTask task : work.getPreAckTasks() ){
          => for (preAckTask task : work.getPreAckTasks()) { ?
    

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(conf);
+            long currentNotificationID = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, 
LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), 
notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : 
{}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {

Review comment:
       nit: format the code, it would fix the check-style issues like the above

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -664,3 +678,7 @@ private int executeIncrementalLoad() throws Exception {
     return 0;
   }
 }
+
+interface preAckTask{

Review comment:
       Move this to AckTask, Fix interface name

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,22 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if ( !work.hasBootstrapLoadTasks() &&
+            ( work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true ) ) {
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<preAckTask> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new preAckTask() {
+        @Override
+        public void run() throws Exception {

Review comment:
       Does it result in non-recoverable error?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,22 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if ( !work.hasBootstrapLoadTasks() &&

Review comment:
       nit: format the line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to