This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new dda3e7190dd HIVE-26937: Batching incremental events to avoid O.O.M 
during repl load (#4019) (Rakshith Chandraiah, reviewed by Teddy Choi)
dda3e7190dd is described below

commit dda3e7190dddfec676394d4af12afa330e00b4cf
Author: Rakshith C <[email protected]>
AuthorDate: Tue Feb 7 10:51:25 2023 +0530

    HIVE-26937: Batching incremental events to avoid O.O.M during repl load 
(#4019) (Rakshith Chandraiah, reviewed by Teddy Choi)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   2 +
 ...stReplAcrossInstancesWithJsonMessageFormat.java |   1 +
 .../hive/ql/parse/TestReplicationScenarios.java    |   1 +
 .../parse/TestReplicationScenariosAcidTables.java  | 156 ++++++++++++++++++++-
 .../TestReplicationScenariosAcrossInstances.java   |   1 +
 .../TestReplicationScenariosExternalTables.java    |   8 +-
 .../parse/TestTableLevelReplicationScenarios.java  |  17 ++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |   2 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  86 ++++++------
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  11 +-
 .../incremental/IncrementalLoadEventsIterator.java |  54 ++++++-
 .../incremental/IncrementalLoadTasksBuilder.java   |   4 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |   2 +
 .../ql/parse/repl/dump/EventsDumpMetadata.java     | 113 +++++++++++++++
 14 files changed, 396 insertions(+), 62 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ea7c56d10f9..b1b441dce7b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -527,6 +527,8 @@ public class HiveConf extends Configuration {
             + "dynamically generating the next set of tasks. The number is 
approximate as Hive \n"
             + "will stop at a slightly higher number, the reason being some 
events might lead to a \n"
             + "task increment that would cross the specified limit."),
+    REPL_BATCH_INCREMENTAL_EVENTS("hive.repl.batch.incremental.events", true,
+            "Dump events in batches during incremental phase of repl dump"),
     
REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100,
         "Number of threads that will be used to dump partition data 
information during repl dump."),
     REPL_TABLE_DUMP_PARALLELISM("hive.repl.table.dump.parallelism", 15,
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
index 3d8e3984c6e..4e2d322c194 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
@@ -48,6 +48,7 @@ public class TestReplAcrossInstancesWithJsonMessageFormat
         "true");
     overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
             UserGroupInformation.getCurrentUser().getUserName());
+    overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, 
"false");
     internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
   }
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 910dda67658..9345d34bc09 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -225,6 +225,7 @@ public class TestReplicationScenarios {
     hconf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
     hconf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE, true);
     hconf.setBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET, 
false);
+    hconf.setBoolVar(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS, false);
     System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
     System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 22c86d3a75f..f3d086eafd9 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -49,7 +49,9 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -77,6 +79,9 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.REPL_RESUME_STARTED_AFTER_FAILOVER;
@@ -86,6 +91,8 @@ import static 
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUND_THREAD;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -130,6 +137,7 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
         put("hive.txn.readonly.enabled", "true");
         //HIVE-25267
         put(MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT.getVarname(), "2000");
+        put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, "false");
         put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"false");
         
put(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET.varname, 
"false");
       }};
@@ -2206,7 +2214,10 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     fs.delete(lastEvtRoot, true);
     fs.delete(secondLastEvtRoot, true);
     fs.delete(thirdLastEvtRoot, true);
-    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID
 - 3), ackLastEventID,
+    EventsDumpMetadata eventsDumpMetadata = 
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+    eventsDumpMetadata.setLastReplId(lastEventID - 3);
+    
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
 - 3);
+    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
 ackLastEventID,
             primary.hiveConf);
     ReplDumpWork.testDeletePreviousDumpMetaPath(false);
 
@@ -2473,15 +2484,18 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
     long fifthLastIncEventID = 
Long.parseLong(incrementalDump1.lastReplicationId) - 4;
     long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
     assertTrue(lastIncEventID > fifthLastIncEventID);
-
+    int deletedEventsCount = 0;
     for (long eventId=fifthLastIncEventID + 1; eventId<=lastIncEventID; 
eventId++) {
       Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
       if (fs.exists(eventRoot)) {
+        deletedEventsCount++;
         fs.delete(eventRoot, true);
       }
     }
-
-    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(fifthLastIncEventID),
 ackLastEventID,
+    EventsDumpMetadata eventsDumpMetadata = 
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+    eventsDumpMetadata.setLastReplId(fifthLastIncEventID);
+    
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
 - deletedEventsCount);
+    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
 ackLastEventID,
             primary.hiveConf);
 
     ReplDumpWork.testDeletePreviousDumpMetaPath(false);
@@ -3588,4 +3602,138 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
       updatedParams.get(REPL_TARGET_DATABASE_PROPERTY));
     assertEquals("15", updatedParams.get(REPL_TARGET_DATABASE_PROPERTY));
   }
+  @Test
+  public void testBatchingOfIncrementalEvents() throws Throwable {
+    final int REPL_MAX_LOAD_TASKS = 5;
+    List<String> incrementalBatchConfigs = Arrays.asList(
+            String.format("'%s'='%s'", 
HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS, "true"),
+            String.format("'%s'='%d'", 
HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, REPL_MAX_LOAD_TASKS)
+    );
+
+    //bootstrap run, config should have no effect
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .dump(primaryDbName, incrementalBatchConfigs);
+
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+
+    replica.load(replicatedDbName, primaryDbName, incrementalBatchConfigs)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1"});
+
+    //incremental run
+    WarehouseInstance.Tuple incrementalDump = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values(2)")
+            .run("insert into t1 values(3)")
+            .run("insert into t1 values(4)")
+            .run("insert into t1 values(5)")
+            .dump(primaryDbName, incrementalBatchConfigs);
+
+    Path dumpPath = new Path(incrementalDump.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackLastEventID = new Path(dumpPath, ReplAck.EVENTS_DUMP.toString());
+    EventsDumpMetadata eventsDumpMetadata = 
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+    assertTrue(eventsDumpMetadata.isEventsBatched());
+
+    int eventsCountInAckFile = eventsDumpMetadata.getEventsDumpedCount(), 
expectedEventsCount = 0;
+    String eventsBatchDirPrefix = ReplUtils.INC_EVENTS_BATCH.replaceAll("%d", 
"");
+
+    List<FileStatus> batchFiles = Arrays.stream(fs.listStatus(dumpPath))
+            .filter(fileStatus -> fileStatus.getPath().getName()
+                    
.startsWith(eventsBatchDirPrefix)).collect(Collectors.toList());
+
+
+    for (FileStatus fileStatus : batchFiles) {
+      int eventsPerBatch = fs.listStatus(fileStatus.getPath()).length;
+      assertTrue(eventsPerBatch <= REPL_MAX_LOAD_TASKS);
+      expectedEventsCount += eventsPerBatch;
+    }
+    assertEquals(eventsCountInAckFile, expectedEventsCount);
+
+    // Repl Load should be agnostic of batch size and 
REPL_BATCH_INCREMENTAL_EVENTS config.
+    // hence not passing incrementalBatchConfigs here.
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1", "2", "3", "4", "5"});
+
+    assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+    //second round of incremental dump.
+    incrementalDump = primary.run("use " + primaryDbName)
+            .run("insert into t1 values(6)")
+            .run("insert into t1 values(7)")
+            .run("insert into t1 values(8)")
+            .run("insert into t1 values(9)")
+            .dump(primaryDbName, incrementalBatchConfigs);
+
+    // simulate a failure in repl dump when batching was enabled.
+    dumpPath = new Path(incrementalDump.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    batchFiles = Arrays.stream(fs.listStatus(dumpPath))
+            .filter(fileStatus -> fileStatus.getPath().getName()
+                    .startsWith(eventsBatchDirPrefix))
+            .sorted(new EventDumpDirComparator()).collect(Collectors.toList());
+
+
+    FileStatus lastBatch = batchFiles.get(batchFiles.size() - 1);
+    Path lastBatchPath = lastBatch.getPath();
+
+    FileStatus[] eventsOfLastBatch = fs.listStatus(lastBatchPath);
+    Arrays.sort(eventsOfLastBatch, new EventDumpDirComparator());
+    Map<FileStatus, Long> modificationTimes = batchFiles.stream().filter(file 
-> !Objects.equals(lastBatch, file))
+            .collect(Collectors.toMap(Function.identity(), 
FileStatus::getModificationTime));
+
+    long lastReplId = Long.parseLong(eventsOfLastBatch[0].getPath().getName()) 
- 1;
+    ackLastEventID = new Path(dumpPath, ReplAck.EVENTS_DUMP.toString());
+    eventsDumpMetadata = EventsDumpMetadata.deserialize(ackLastEventID, conf);
+    eventsDumpMetadata.setLastReplId(lastReplId);
+    
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
 - (eventsOfLastBatch.length - 1));
+    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
+            ackLastEventID,
+            primary.hiveConf);
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), false);
+    //delete all events of last batch except one.
+    for (int idx = 1; idx < eventsOfLastBatch.length; idx++)
+      fs.delete(eventsOfLastBatch[idx].getPath(), true);
+
+    // when we try to resume a failed dump which was batched without setting 
REPL_BATCH_INCREMENTAL_EVENTS = true
+    // in the next run dump should fail.
+    primary.dumpFailure(primaryDbName);
+    if (ReplUtils.failedWithNonRecoverableError(dumpPath, conf)) {
+      fs.delete(new Path(dumpPath, ReplAck.NON_RECOVERABLE_MARKER.toString()), 
false);
+    }
+
+    WarehouseInstance.Tuple dumpAfterFailure = primary.dump(primaryDbName, 
incrementalBatchConfigs);
+    //ensure dump did recover and dump location of new dump is same as the 
previous one.
+    assertEquals(dumpAfterFailure.dumpLocation, incrementalDump.dumpLocation);
+
+    List<FileStatus> filesAfterFailedDump = 
Arrays.stream(fs.listStatus(dumpPath))
+            .filter(fileStatus -> fileStatus.getPath().getName()
+                    .startsWith(eventsBatchDirPrefix))
+            .sorted(new EventDumpDirComparator()).collect(Collectors.toList());
+
+    //ensure all event files are dumped again.
+    assertEquals(batchFiles, filesAfterFailedDump);
+
+    //ensure last batch had events dumped and was indeed modified.
+    assertNotEquals(lastBatch.getModificationTime(),
+            filesAfterFailedDump.get(filesAfterFailedDump.size() - 
1).getModificationTime());
+
+    assertArrayEquals(fs.listStatus(lastBatchPath),
+            fs.listStatus(filesAfterFailedDump.get(filesAfterFailedDump.size() 
- 1).getPath()));
+
+    //ensure remaining batches were not modified.
+    assertTrue(filesAfterFailedDump.stream()
+            .filter(file -> !Objects.equals(file, lastBatch))
+            .allMatch(file -> file.getModificationTime() == 
modificationTimes.get(file)));
+    //ensure successful repl load.
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1", "2", "3", "4", "5", "6", "7", 
"8", "9"});
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+  }
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 77e7390e8b1..fb2ad07acb7 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -99,6 +99,7 @@ public class TestReplicationScenariosAcrossInstances extends 
BaseReplicationAcro
         "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
     overrides.put(MetastoreConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.getVarname(),
         "true");
+    overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, 
"false");
     internalBeforeClassSetup(overrides, 
TestReplicationScenariosAcrossInstances.class);
   }
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 29373c2bf28..9eefd04e7f9 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.StringAppender;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
@@ -104,6 +105,7 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
     overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
         UserGroupInformation.getCurrentUser().getUserName());
     
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"false");
+    overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, 
"false");
 
     internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
   }
@@ -726,16 +728,19 @@ public class TestReplicationScenariosExternalTables 
extends BaseReplicationAcros
     assertFalse(fs.exists(dataDir));
     long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime();
     fs.delete(ackFile, false);
+    EventsDumpMetadata eventsDumpMetadata = 
EventsDumpMetadata.deserialize(ackLastEventID, conf);
     fs.delete(ackLastEventID, false);
     //delete all the event folders except first event
     long startEvent = -1;
     long endEvent = Long.valueOf(incrementalDump1.lastReplicationId);
+    int deletedEventsCount = 0;
     for (long eventDir = Long.valueOf(tuple.lastReplicationId) + 1;  eventDir 
<= endEvent; eventDir++) {
       Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventDir));
       if (fs.exists(eventRoot)) {
         if (startEvent == -1){
           startEvent = eventDir;
         } else {
+          deletedEventsCount++;
           fs.delete(eventRoot, true);
         }
       }
@@ -746,7 +751,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
       firstEventModTimeMap.put(fileStatus.getPath(), 
fileStatus.getModificationTime());
     }
     assertTrue(endEvent - startEvent > 1);
-    Utils.writeOutput(String.valueOf(startEvent), ackLastEventID, 
primary.hiveConf);
+    
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
 - deletedEventsCount);
+    Utils.writeOutput(eventsDumpMetadata.serialize(), ackLastEventID, 
primary.hiveConf);
     WarehouseInstance.Tuple incrementalDump2 = primary.dump(primaryDbName, 
withClause);
     assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
     assertTrue(fs.getFileStatus(metaDir).getModificationTime() > 
oldMetadirModTime);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index a732af41244..0c1d945458b 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -69,7 +70,7 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
     overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
         UserGroupInformation.getCurrentUser().getUserName());
-
+    overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, 
"false");
     internalBeforeClassSetup(overrides, 
TestTableLevelReplicationScenarios.class);
   }
 
@@ -1277,8 +1278,11 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     fs.delete(lastEvtRoot, true);
     fs.delete(secondLastEvtRoot, true);
     fs.delete(thirdLastEvtRoot, true);
-    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID
 - 3), ackLastEventID,
-      primary.hiveConf);
+    EventsDumpMetadata eventsDumpMetadata = 
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+    eventsDumpMetadata.setLastReplId(lastEventID - 3);
+    
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
 - 3);
+    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
 ackLastEventID,
+            primary.hiveConf);
     ReplDumpWork.testDeletePreviousDumpMetaPath(false);
 
     WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + 
primaryDbName)
@@ -1355,8 +1359,11 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     fs.delete(lastEvtRoot, true);
     fs.delete(secondLastEvtRoot, true);
     fs.delete(thirdLastEvtRoot, true);
-    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID
 - 3), ackLastEventID,
-      primary.hiveConf);
+    EventsDumpMetadata eventsDumpMetadata = 
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+    eventsDumpMetadata.setLastReplId(lastEventID - 3);
+    
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
 - 3);
+    
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
 ackLastEventID,
+            primary.hiveConf);
     ReplDumpWork.testDeletePreviousDumpMetaPath(false);
 
     replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3|t13'";
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 3695a7e03b3..e40645ffc50 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -479,7 +479,7 @@ public class WarehouseInstance implements Closeable {
   public int getNoOfEventsDumped(String dumpLocation, HiveConf conf) throws 
Throwable {
     IncrementalLoadEventsIterator itr = new IncrementalLoadEventsIterator(
             dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR, 
conf);
-    return itr.getNumEvents();
+    return itr.getTotalEventsCount();
   }
 
   public List<String> getAllTables(String dbName) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index edbe52a2038..6b14febc176 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec.repl;
 
 import org.apache.commons.collections4.CollectionUtils;
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -76,6 +75,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.ExportService;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
 import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
 import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -103,13 +103,10 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.Serializable;
-import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.LinkedHashMap;
 import java.util.Set;
@@ -873,7 +870,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
         evFetcher, work.eventFrom, maxEventLimit, evFilter);
     lastReplId = work.eventTo;
     Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString());
-    long resumeFrom = Utils.fileExists(ackFile, conf) ? getResumeFrom(ackFile) 
: work.eventFrom;
+    boolean shouldBatch = 
conf.getBoolVar(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS);
+
+    EventsDumpMetadata eventsDumpMetadata =
+            Utils.fileExists(ackFile, conf) ? 
EventsDumpMetadata.deserialize(ackFile, conf)
+                    : new EventsDumpMetadata(work.eventFrom, 0, shouldBatch);
+
+    long resumeFrom = eventsDumpMetadata.getLastReplId();
 
     long estimatedNumEvents = 
evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
         maxEventLimit);
@@ -907,15 +910,44 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
         LOG.info("Event id {} to {} are already dumped, skipping {} events", 
work.eventFrom, resumeFrom, dumpedCount);
       }
       boolean isStagingDirCheckedForFailedEvents = false;
+
+      int batchNo = 0, eventCount = 0;
+      final int maxEventsPerBatch = 
conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+      Path eventRootDir = dumpRoot;
+
+      if (shouldBatch && maxEventsPerBatch == 0) {
+        throw new SemanticException(String.format(
+                "batch size configured via %s cannot be set to zero since 
batching is enabled",
+                HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname));
+      }
+      if (eventsDumpMetadata.isEventsBatched() != shouldBatch) {
+        LOG.error("Failed to resume from previous dump. {} was set to {} in 
previous dump but currently it's" +
+                        " set to {}. Cannot dump events in {} manner because 
they were {} batched in " +
+                        "the previous incomplete run",
+                HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, 
eventsDumpMetadata.isEventsBatched(),
+                shouldBatch, shouldBatch ? "batched" : "sequential", 
shouldBatch ? "not" : ""
+        );
+
+        throw new HiveException(
+                String.format("Failed to resume from previous dump. %s must be 
set to %s, but currently it's set to %s",
+                        HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS,
+                        eventsDumpMetadata.isEventsBatched(), shouldBatch)
+        );
+      }
+
       while (evIter.hasNext()) {
         NotificationEvent ev = evIter.next();
         lastReplId = ev.getEventId();
+
+        if (shouldBatch && eventCount++ % maxEventsPerBatch == 0) {
+          eventRootDir = new Path(dumpRoot, 
String.format(ReplUtils.INC_EVENTS_BATCH, ++batchNo));
+        }
         if (ev.getEventId() <= resumeFrom) {
           continue;
         }
         // Checking and removing remnant file from staging directory if 
previous incremental repl dump is failed
         if (!isStagingDirCheckedForFailedEvents) {
-          cleanFailedEventDirIfExists(dumpRoot, ev.getEventId());
+          cleanFailedEventDirIfExists(eventRootDir, ev.getEventId());
           isStagingDirCheckedForFailedEvents = true;
         }
 
@@ -935,9 +967,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
           }
         }
 
-        Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId));
-        dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb);
-        Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf);
+        Path eventDir = new Path(eventRootDir, String.valueOf(lastReplId));
+        dumpEvent(ev, eventDir, dumpRoot, cmRoot, hiveDb, eventsDumpMetadata);
+        eventsDumpMetadata.setLastReplId(lastReplId);
+        Utils.writeOutput(eventsDumpMetadata.serialize(), ackFile, conf);
       }
       replLogger.endLog(lastReplId.toString());
       LOG.info("Done dumping events, preparing to return {},{}", 
dumpRoot.toUri(), lastReplId);
@@ -1147,33 +1180,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     }
   }
 
-  private long getResumeFrom(Path ackFile) throws SemanticException {
-    Retryable retryable = Retryable.builder()
-      .withHiveConf(conf)
-      .withRetryOnException(Exception.class).build();
-    try {
-      return retryable.executeCallable(() -> {
-        BufferedReader br = null;
-        try {
-          FileSystem fs = ackFile.getFileSystem(conf);
-          br = new BufferedReader(new InputStreamReader(fs.open(ackFile), 
Charset.defaultCharset()));
-          long lastEventID = Long.parseLong(br.readLine());
-          return lastEventID;
-        } finally {
-          if (br != null) {
-            try {
-              br.close();
-            } catch (Exception e) {
-              //Do nothing
-            }
-          }
-        }
-      });
-    } catch (Exception e) {
-      throw new 
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
-    }
-  }
-
   private boolean needBootstrapAcidTablesDuringIncrementalDump() {
     // If acid table dump is not enabled, then no need to check further.
     if (!ReplUtils.includeAcidTableInDump(conf)) {
@@ -1188,7 +1194,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
             || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
   }
 
-  private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, 
Path cmRoot, Hive db) throws Exception {
+  private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, 
Path cmRoot, Hive db,
+                         EventsDumpMetadata eventsDumpMetadata) throws 
Exception {
     EventHandler.Context context = new EventHandler.Context(
         evRoot,
         dumpRoot,
@@ -1202,6 +1209,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     );
     EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
     eventHandler.handle(context);
+    eventsDumpMetadata.incrementEventsDumpedCount();
     work.getMetricCollector().reportStageProgress(getName(), 
ReplUtils.MetricName.EVENTS.name(), 1);
     work.getReplLogger().eventLog(String.valueOf(ev.getEventId()), 
eventHandler.dumpType().toString());
   }
@@ -1498,8 +1506,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     Path lastEventFile = new Path(hiveDumpPath, 
ReplAck.EVENTS_DUMP.toString());
     long resumeFrom = 0;
     try {
-      resumeFrom = getResumeFrom(lastEventFile);
-    } catch (SemanticException ex) {
+      resumeFrom = EventsDumpMetadata.deserialize(lastEventFile, 
conf).getLastReplId();
+    } catch (HiveException ex) {
       LOG.info("Could not get last repl id from {}, because of:", 
lastEventFile, ex.getMessage());
     }
     return resumeFrom > 0L;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 60f24862112..ea5c0fd1503 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuild
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -183,9 +184,13 @@ public class ReplLoadWork implements Serializable, 
ReplLoadWorkMBean {
         this.bootstrapIterator = null;
         this.constraintsIterator = null;
       }
-      incrementalLoadTasksBuilder = new 
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
-              new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), 
hiveConf, eventTo, metricCollector,
-              replStatsTracker, shouldFailover, tablesToBootstrap.size());
+      try {
+        incrementalLoadTasksBuilder = new 
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
+                new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), 
hiveConf, eventTo, metricCollector,
+                replStatsTracker, shouldFailover, tablesToBootstrap.size());
+      } catch (HiveException e) {
+        throw new SemanticException(e.getMessage(), e);
+      }
     } else {
       this.bootstrapIterator = new BootstrapEventsIterator(new 
Path(dumpDirectory, EximUtil.METADATA_PATH_NAME)
               .toString(), dbNameToLoadIn, true, hiveConf, metricCollector);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
index f2c8e8fd542..8913b292ce9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
@@ -22,7 +22,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 
 import java.io.IOException;
@@ -37,17 +40,36 @@ import java.util.NoSuchElementException;
 public class IncrementalLoadEventsIterator implements Iterator<FileStatus> {
   private FileStatus[] eventDirs;
   private int currentIndex;
+  // when events are batched numEvents denotes number of events in a given 
batch else numEvents = totalEventsCount
   private int numEvents;
+  private int totalEventsCount;
+  private boolean eventsBatched;
+  private Iterator<FileStatus> eventBatchDirsIterator;
+  private FileSystem fs;
 
-  public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws 
IOException {
+  public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws 
IOException, HiveException {
     Path eventPath = new Path(loadPath);
-    FileSystem fs = eventPath.getFileSystem(conf);
+    fs = eventPath.getFileSystem(conf);
+    Path eventsDumpAckFile = new Path(eventPath, 
ReplAck.EVENTS_DUMP.toString());
+
     eventDirs = fs.listStatus(eventPath, 
ReplUtils.getEventsDirectoryFilter(fs));
+
     if ((eventDirs == null) || (eventDirs.length == 0)) {
-      currentIndex = 0;
-      numEvents = 0;
+      currentIndex = numEvents = totalEventsCount = 0;
       return;
     }
+    EventsDumpMetadata eventsDumpMetadata = 
EventsDumpMetadata.deserialize(eventsDumpAckFile, conf);
+    eventsBatched = eventsDumpMetadata.isEventsBatched();
+    totalEventsCount = eventDirs.length;
+
+    if (eventsBatched) {
+      //eventDirs will now have batches eg. events_batch_2, events_batch_1 etc.
+      Arrays.sort(eventDirs, new EventDumpDirComparator());
+      eventBatchDirsIterator = Arrays.stream(eventDirs).iterator();
+      // get all events from first batch.
+      eventDirs = fs.listStatus(eventBatchDirsIterator.next().getPath(), 
ReplUtils.getEventsDirectoryFilter(fs));
+      totalEventsCount = eventsDumpMetadata.getEventsDumpedCount();
+    }
     // For event dump, each sub-dir is an individual event dump.
     // We need to guarantee that the directory listing we got is in order of 
event id.
     Arrays.sort(eventDirs, new EventDumpDirComparator());
@@ -60,16 +82,34 @@ public class IncrementalLoadEventsIterator implements 
Iterator<FileStatus> {
     return (eventDirs != null && currentIndex < numEvents);
   }
 
+  private boolean hasNextBatch() {
+    return eventBatchDirsIterator.hasNext();
+  }
+
   @Override
   public FileStatus next() {
-    if (hasNext()) {
+    if (eventsBatched) {
+      FileStatus event = eventDirs[currentIndex++];
+      if (hasNextBatch() && currentIndex == numEvents) {
+        try {
+          eventDirs = fs.listStatus(eventBatchDirsIterator.next().getPath(), 
ReplUtils.getEventsDirectoryFilter(fs));
+          Arrays.sort(eventDirs, new EventDumpDirComparator());
+          currentIndex = 0;
+          numEvents = eventDirs.length;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return event;
+    }
+    else if (hasNext()) {
       return eventDirs[currentIndex++];
     } else {
       throw new NoSuchElementException("no more events");
     }
   }
 
-  public int getNumEvents() {
-    return numEvents;
+  public int getTotalEventsCount() {
+    return totalEventsCount;
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index c04f8c9465a..eda4a3f070e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -91,13 +91,13 @@ public class IncrementalLoadTasksBuilder {
     outputs = new HashSet<>();
     log = null;
     this.conf = conf;
-    replLogger = new IncrementalLoadLogger(dbName, loadPath, 
iterator.getNumEvents(), replStatsTracker);
+    replLogger = new IncrementalLoadLogger(dbName, loadPath, 
iterator.getTotalEventsCount(), replStatsTracker);
     replLogger.startLog();
     this.eventTo = eventTo;
     setNumIteration(0);
     this.metricCollector = metricCollector;
     Map<String, Long> metricMap = new HashMap<>();
-    metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 
iterator.getNumEvents());
+    metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 
iterator.getTotalEventsCount());
     this.shouldFailover = shouldFailover;
     if (shouldFailover) {
       Database db = null;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index b884030723d..abc3e6627c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -160,6 +160,8 @@ public class ReplUtils {
 
   // Service name for atlas.
   public static final String REPL_ATLAS_SERVICE = "atlas";
+  public static final String INC_EVENTS_BATCH = "events_batch_%d";
+
   /**
    * Bootstrap REPL LOAD operation type on the examined object based on ckpt 
state.
    */
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/EventsDumpMetadata.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/EventsDumpMetadata.java
new file mode 100644
index 00000000000..f2f38a99780
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/EventsDumpMetadata.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+
+/**
+ * This class represents the metadata related to events in repl dump.
+ * Metadata includes,
+ * lastReplId - the last successfully dumped eventId.
+ * eventsDumpedCount - number of events dumped in staging directory.
+ * eventsBatched - denotes if events are dumped in batches.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class EventsDumpMetadata {
+  @JsonProperty
+  private Long lastReplId;
+  @JsonProperty
+  private Integer eventsDumpedCount;
+  @JsonProperty
+  private boolean eventsBatched;
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  public EventsDumpMetadata() {
+
+  }
+
+  public EventsDumpMetadata(Long lastReplId, Integer eventsDumpedCount, 
boolean eventsBatched) {
+    this.lastReplId = lastReplId;
+    this.eventsDumpedCount = eventsDumpedCount;
+    this.eventsBatched = eventsBatched;
+  }
+
+  public Long getLastReplId() {
+    return lastReplId;
+  }
+
+  public Integer getEventsDumpedCount() {
+    return eventsDumpedCount;
+  }
+
+  public void setLastReplId(Long lastReplId) {
+    this.lastReplId = lastReplId;
+  }
+
+  public void setEventsDumpedCount(Integer eventsDumpedCount) {
+    this.eventsDumpedCount = eventsDumpedCount;
+  }
+
+  public void incrementEventsDumpedCount() {
+    this.eventsDumpedCount++;
+  }
+
+  public boolean isEventsBatched() {
+    return eventsBatched;
+  }
+
+  public void setEventsBatched(boolean eventsBatched) {
+    this.eventsBatched = eventsBatched;
+  }
+
+  public String serialize() throws JsonProcessingException {
+    return objectMapper.writeValueAsString(this);
+  }
+
+  public static EventsDumpMetadata deserialize(Path ackFile, HiveConf conf) 
throws HiveException {
+    Retryable retryable = Retryable.builder()
+            .withHiveConf(conf)
+            .withRetryOnException(IOException.class).build();
+    try {
+      return retryable.executeCallable(() -> {
+        FileSystem fs = ackFile.getFileSystem(conf);
+        try (BufferedReader br = new BufferedReader(
+                new InputStreamReader(fs.open(ackFile), 
Charset.defaultCharset()))
+        ) {
+          return objectMapper.readValue(br, EventsDumpMetadata.class);
+        }
+      });
+    } catch (Exception e) {
+      throw new 
HiveException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
+    }
+  }
+}


Reply via email to