pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r583218805



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(conf);
+            long currentNotificationID = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, 
LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), 
notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : 
{}", notificationFilePath, currentNotificationID);

Review comment:
       NotificationACK file  -> Load metadata file

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +504,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(conf);
+            long currentNotificationID = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, 
NOTIFICATION_FILE.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), 
notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : 
{}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Use a varaint of RuntimeException and throw it back

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws 
IOException {
     return this;
   }
 
+  long verifyNotificationAck(String dumpLocation, long prevNotificationID) 
throws Exception {

Review comment:
       Move it to TestReplicationScenariosAcidTables

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){

Review comment:
       Few lines like this are crossing the  default max length(120 I thinl) 
for check-style. You may want to format them

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws 
IOException {
     return this;
   }
 
+  long verifyNotificationAck(String dumpLocation, long prevNotificationID) 
throws Exception {
+    FileSystem fs = new Path(dumpLocation).getFileSystem(hiveConf);
+    Path notificationAckFile = new Path(dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplAck.NOTIFICATION_FILE);
+    assertTrue(fs.exists(notificationAckFile));
+    long currentNotificationID = getCurrentNotificationEventId().getEventId();
+    long previousLoadNotificationID = 
fetchNotificationIDFromDump(notificationAckFile, fs);
+    assertTrue(previousLoadNotificationID > prevNotificationID && 
currentNotificationID > previousLoadNotificationID);
+    return previousLoadNotificationID;
+  }
+
+  long fetchNotificationIDFromDump(Path notificationAckFile, FileSystem fs) 
throws Exception{
+    InputStream inputstream = fs.open(notificationAckFile);
+    BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputstream));
+    String line = reader.readLine();
+    assertTrue(line!=null && reader.readLine()==null);
+    return Long.parseLong(line);

Review comment:
       close the reader/stream

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(conf);
+            long currentNotificationID = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, 
LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), 
notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : 
{}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {
+            throw new RuntimeException(e);

Review comment:
       Does it become non-recoverable error or recoverable error?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -51,8 +51,12 @@
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;

Review comment:
       Remove unused import

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java
##########
@@ -35,6 +36,7 @@
   private static final long serialVersionUID = 1L;
   private Path ackFilePath;
   private transient ReplicationMetricCollector metricCollector;
+  private List<Runnable> tasks;

Review comment:
       nit: How about renaming to preAckTasks to avoid confusion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to