This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new dda3e7190dd HIVE-26937: Batching incremental events to avoid O.O.M
during repl load (#4019) (Rakshith Chandraiah, reviewed by Teddy Choi)
dda3e7190dd is described below
commit dda3e7190dddfec676394d4af12afa330e00b4cf
Author: Rakshith C <[email protected]>
AuthorDate: Tue Feb 7 10:51:25 2023 +0530
HIVE-26937: Batching incremental events to avoid O.O.M during repl load
(#4019) (Rakshith Chandraiah, reviewed by Teddy Choi)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 +
...stReplAcrossInstancesWithJsonMessageFormat.java | 1 +
.../hive/ql/parse/TestReplicationScenarios.java | 1 +
.../parse/TestReplicationScenariosAcidTables.java | 156 ++++++++++++++++++++-
.../TestReplicationScenariosAcrossInstances.java | 1 +
.../TestReplicationScenariosExternalTables.java | 8 +-
.../parse/TestTableLevelReplicationScenarios.java | 17 ++-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 2 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 86 ++++++------
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 11 +-
.../incremental/IncrementalLoadEventsIterator.java | 54 ++++++-
.../incremental/IncrementalLoadTasksBuilder.java | 4 +-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 2 +
.../ql/parse/repl/dump/EventsDumpMetadata.java | 113 +++++++++++++++
14 files changed, 396 insertions(+), 62 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ea7c56d10f9..b1b441dce7b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -527,6 +527,8 @@ public class HiveConf extends Configuration {
+ "dynamically generating the next set of tasks. The number is
approximate as Hive \n"
+ "will stop at a slightly higher number, the reason being some
events might lead to a \n"
+ "task increment that would cross the specified limit."),
+ REPL_BATCH_INCREMENTAL_EVENTS("hive.repl.batch.incremental.events", true,
+ "Dump events in batches during incremental phase of repl dump"),
REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100,
"Number of threads that will be used to dump partition data
information during repl dump."),
REPL_TABLE_DUMP_PARALLELISM("hive.repl.table.dump.parallelism", 15,
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
index 3d8e3984c6e..4e2d322c194 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java
@@ -48,6 +48,7 @@ public class TestReplAcrossInstancesWithJsonMessageFormat
"true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
+ overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname,
"false");
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 910dda67658..9345d34bc09 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -225,6 +225,7 @@ public class TestReplicationScenarios {
hconf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, true);
hconf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE, true);
hconf.setBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET,
false);
+ hconf.setBoolVar(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS, false);
System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 22c86d3a75f..f3d086eafd9 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -49,7 +49,9 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -77,6 +79,9 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.common.repl.ReplConst.REPL_RESUME_STARTED_AFTER_FAILOVER;
@@ -86,6 +91,8 @@ import static
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION
import static
org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUND_THREAD;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -130,6 +137,7 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
put("hive.txn.readonly.enabled", "true");
//HIVE-25267
put(MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT.getVarname(), "2000");
+ put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, "false");
put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname,
"false");
put(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET.varname,
"false");
}};
@@ -2206,7 +2214,10 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
fs.delete(lastEvtRoot, true);
fs.delete(secondLastEvtRoot, true);
fs.delete(thirdLastEvtRoot, true);
-
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID
- 3), ackLastEventID,
+ EventsDumpMetadata eventsDumpMetadata =
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+ eventsDumpMetadata.setLastReplId(lastEventID - 3);
+
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
- 3);
+
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
ackLastEventID,
primary.hiveConf);
ReplDumpWork.testDeletePreviousDumpMetaPath(false);
@@ -2473,15 +2484,18 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
long fifthLastIncEventID =
Long.parseLong(incrementalDump1.lastReplicationId) - 4;
long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
assertTrue(lastIncEventID > fifthLastIncEventID);
-
+ int deletedEventsCount = 0;
for (long eventId=fifthLastIncEventID + 1; eventId<=lastIncEventID;
eventId++) {
Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
if (fs.exists(eventRoot)) {
+ deletedEventsCount++;
fs.delete(eventRoot, true);
}
}
-
-
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(fifthLastIncEventID),
ackLastEventID,
+ EventsDumpMetadata eventsDumpMetadata =
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+ eventsDumpMetadata.setLastReplId(fifthLastIncEventID);
+
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
- deletedEventsCount);
+
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
ackLastEventID,
primary.hiveConf);
ReplDumpWork.testDeletePreviousDumpMetaPath(false);
@@ -3588,4 +3602,138 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
updatedParams.get(REPL_TARGET_DATABASE_PROPERTY));
assertEquals("15", updatedParams.get(REPL_TARGET_DATABASE_PROPERTY));
}
+ @Test
+ public void testBatchingOfIncrementalEvents() throws Throwable {
+ final int REPL_MAX_LOAD_TASKS = 5;
+ List<String> incrementalBatchConfigs = Arrays.asList(
+ String.format("'%s'='%s'",
HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS, "true"),
+ String.format("'%s'='%d'",
HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, REPL_MAX_LOAD_TASKS)
+ );
+
+ //bootstrap run, config should have no effect
+ WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .dump(primaryDbName, incrementalBatchConfigs);
+
+ FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+
+ replica.load(replicatedDbName, primaryDbName, incrementalBatchConfigs)
+ .run("use " + replicatedDbName)
+ .run("select * from t1")
+ .verifyResults(new String[]{"1"});
+
+ //incremental run
+ WarehouseInstance.Tuple incrementalDump = primary.run("use " +
primaryDbName)
+ .run("insert into t1 values(2)")
+ .run("insert into t1 values(3)")
+ .run("insert into t1 values(4)")
+ .run("insert into t1 values(5)")
+ .dump(primaryDbName, incrementalBatchConfigs);
+
+ Path dumpPath = new Path(incrementalDump.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
+ Path ackLastEventID = new Path(dumpPath, ReplAck.EVENTS_DUMP.toString());
+ EventsDumpMetadata eventsDumpMetadata =
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+ assertTrue(eventsDumpMetadata.isEventsBatched());
+
+ int eventsCountInAckFile = eventsDumpMetadata.getEventsDumpedCount(),
expectedEventsCount = 0;
+ String eventsBatchDirPrefix = ReplUtils.INC_EVENTS_BATCH.replaceAll("%d",
"");
+
+ List<FileStatus> batchFiles = Arrays.stream(fs.listStatus(dumpPath))
+ .filter(fileStatus -> fileStatus.getPath().getName()
+
.startsWith(eventsBatchDirPrefix)).collect(Collectors.toList());
+
+
+ for (FileStatus fileStatus : batchFiles) {
+ int eventsPerBatch = fs.listStatus(fileStatus.getPath()).length;
+ assertTrue(eventsPerBatch <= REPL_MAX_LOAD_TASKS);
+ expectedEventsCount += eventsPerBatch;
+ }
+ assertEquals(eventsCountInAckFile, expectedEventsCount);
+
+ // Repl Load should be agnostic of batch size and
REPL_BATCH_INCREMENTAL_EVENTS config.
+ // hence not passing incrementalBatchConfigs here.
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select * from t1")
+ .verifyResults(new String[]{"1", "2", "3", "4", "5"});
+
+ assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
+
+ ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+ //second round of incremental dump.
+ incrementalDump = primary.run("use " + primaryDbName)
+ .run("insert into t1 values(6)")
+ .run("insert into t1 values(7)")
+ .run("insert into t1 values(8)")
+ .run("insert into t1 values(9)")
+ .dump(primaryDbName, incrementalBatchConfigs);
+
+ // simulate a failure in repl dump when batching was enabled.
+ dumpPath = new Path(incrementalDump.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
+ batchFiles = Arrays.stream(fs.listStatus(dumpPath))
+ .filter(fileStatus -> fileStatus.getPath().getName()
+ .startsWith(eventsBatchDirPrefix))
+ .sorted(new EventDumpDirComparator()).collect(Collectors.toList());
+
+
+ FileStatus lastBatch = batchFiles.get(batchFiles.size() - 1);
+ Path lastBatchPath = lastBatch.getPath();
+
+ FileStatus[] eventsOfLastBatch = fs.listStatus(lastBatchPath);
+ Arrays.sort(eventsOfLastBatch, new EventDumpDirComparator());
+ Map<FileStatus, Long> modificationTimes = batchFiles.stream().filter(file
-> !Objects.equals(lastBatch, file))
+ .collect(Collectors.toMap(Function.identity(),
FileStatus::getModificationTime));
+
+ long lastReplId = Long.parseLong(eventsOfLastBatch[0].getPath().getName())
- 1;
+ ackLastEventID = new Path(dumpPath, ReplAck.EVENTS_DUMP.toString());
+ eventsDumpMetadata = EventsDumpMetadata.deserialize(ackLastEventID, conf);
+ eventsDumpMetadata.setLastReplId(lastReplId);
+
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
- (eventsOfLastBatch.length - 1));
+
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
+ ackLastEventID,
+ primary.hiveConf);
+ fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), false);
+ //delete all events of last batch except one.
+ for (int idx = 1; idx < eventsOfLastBatch.length; idx++)
+ fs.delete(eventsOfLastBatch[idx].getPath(), true);
+
+ // when we try to resume a failed dump which was batched without setting
REPL_BATCH_INCREMENTAL_EVENTS = true
+ // in the next run dump should fail.
+ primary.dumpFailure(primaryDbName);
+ if (ReplUtils.failedWithNonRecoverableError(dumpPath, conf)) {
+ fs.delete(new Path(dumpPath, ReplAck.NON_RECOVERABLE_MARKER.toString()),
false);
+ }
+
+ WarehouseInstance.Tuple dumpAfterFailure = primary.dump(primaryDbName,
incrementalBatchConfigs);
+ //ensure dump did recover and dump location of new dump is same as the
previous one.
+ assertEquals(dumpAfterFailure.dumpLocation, incrementalDump.dumpLocation);
+
+ List<FileStatus> filesAfterFailedDump =
Arrays.stream(fs.listStatus(dumpPath))
+ .filter(fileStatus -> fileStatus.getPath().getName()
+ .startsWith(eventsBatchDirPrefix))
+ .sorted(new EventDumpDirComparator()).collect(Collectors.toList());
+
+ //ensure all event files are dumped again.
+ assertEquals(batchFiles, filesAfterFailedDump);
+
+ //ensure last batch had events dumped and was indeed modified.
+ assertNotEquals(lastBatch.getModificationTime(),
+ filesAfterFailedDump.get(filesAfterFailedDump.size() -
1).getModificationTime());
+
+ assertArrayEquals(fs.listStatus(lastBatchPath),
+ fs.listStatus(filesAfterFailedDump.get(filesAfterFailedDump.size()
- 1).getPath()));
+
+ //ensure remaining batches were not modified.
+ assertTrue(filesAfterFailedDump.stream()
+ .filter(file -> !Objects.equals(file, lastBatch))
+ .allMatch(file -> file.getModificationTime() ==
modificationTimes.get(file)));
+ //ensure successful repl load.
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("select * from t1")
+ .verifyResults(new String[]{"1", "2", "3", "4", "5", "6", "7",
"8", "9"});
+
+ ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+ }
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 77e7390e8b1..fb2ad07acb7 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -99,6 +99,7 @@ public class TestReplicationScenariosAcrossInstances extends
BaseReplicationAcro
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
overrides.put(MetastoreConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.getVarname(),
"true");
+ overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname,
"false");
internalBeforeClassSetup(overrides,
TestReplicationScenariosAcrossInstances.class);
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 29373c2bf28..9eefd04e7f9 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.StringAppender;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
@@ -104,6 +105,7 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname,
"false");
+ overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname,
"false");
internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
}
@@ -726,16 +728,19 @@ public class TestReplicationScenariosExternalTables
extends BaseReplicationAcros
assertFalse(fs.exists(dataDir));
long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime();
fs.delete(ackFile, false);
+ EventsDumpMetadata eventsDumpMetadata =
EventsDumpMetadata.deserialize(ackLastEventID, conf);
fs.delete(ackLastEventID, false);
//delete all the event folders except first event
long startEvent = -1;
long endEvent = Long.valueOf(incrementalDump1.lastReplicationId);
+ int deletedEventsCount = 0;
for (long eventDir = Long.valueOf(tuple.lastReplicationId) + 1; eventDir
<= endEvent; eventDir++) {
Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventDir));
if (fs.exists(eventRoot)) {
if (startEvent == -1){
startEvent = eventDir;
} else {
+ deletedEventsCount++;
fs.delete(eventRoot, true);
}
}
@@ -746,7 +751,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
firstEventModTimeMap.put(fileStatus.getPath(),
fileStatus.getModificationTime());
}
assertTrue(endEvent - startEvent > 1);
- Utils.writeOutput(String.valueOf(startEvent), ackLastEventID,
primary.hiveConf);
+
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
- deletedEventsCount);
+ Utils.writeOutput(eventsDumpMetadata.serialize(), ackLastEventID,
primary.hiveConf);
WarehouseInstance.Tuple incrementalDump2 = primary.dump(primaryDbName,
withClause);
assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
assertTrue(fs.getFileStatus(metaDir).getModificationTime() >
oldMetadirModTime);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index a732af41244..0c1d945458b 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.security.UserGroupInformation;
@@ -69,7 +70,7 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname,
"true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
-
+ overrides.put(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname,
"false");
internalBeforeClassSetup(overrides,
TestTableLevelReplicationScenarios.class);
}
@@ -1277,8 +1278,11 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
fs.delete(lastEvtRoot, true);
fs.delete(secondLastEvtRoot, true);
fs.delete(thirdLastEvtRoot, true);
-
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID
- 3), ackLastEventID,
- primary.hiveConf);
+ EventsDumpMetadata eventsDumpMetadata =
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+ eventsDumpMetadata.setLastReplId(lastEventID - 3);
+
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
- 3);
+
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
ackLastEventID,
+ primary.hiveConf);
ReplDumpWork.testDeletePreviousDumpMetaPath(false);
WarehouseInstance.Tuple incrementalDump4 = primary.run("use " +
primaryDbName)
@@ -1355,8 +1359,11 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
fs.delete(lastEvtRoot, true);
fs.delete(secondLastEvtRoot, true);
fs.delete(thirdLastEvtRoot, true);
-
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(String.valueOf(lastEventID
- 3), ackLastEventID,
- primary.hiveConf);
+ EventsDumpMetadata eventsDumpMetadata =
EventsDumpMetadata.deserialize(ackLastEventID, conf);
+ eventsDumpMetadata.setLastReplId(lastEventID - 3);
+
eventsDumpMetadata.setEventsDumpedCount(eventsDumpMetadata.getEventsDumpedCount()
- 3);
+
org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(eventsDumpMetadata.serialize(),
ackLastEventID,
+ primary.hiveConf);
ReplDumpWork.testDeletePreviousDumpMetaPath(false);
replPolicy = primaryDbName + ".'(t1+)|(t2)'.'t11|t3|t13'";
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 3695a7e03b3..e40645ffc50 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -479,7 +479,7 @@ public class WarehouseInstance implements Closeable {
public int getNoOfEventsDumped(String dumpLocation, HiveConf conf) throws
Throwable {
IncrementalLoadEventsIterator itr = new IncrementalLoadEventsIterator(
dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR,
conf);
- return itr.getNumEvents();
+ return itr.getTotalEventsCount();
}
public List<String> getAllTables(String dbName) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index edbe52a2038..6b14febc176 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.repl;
import org.apache.commons.collections4.CollectionUtils;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -76,6 +75,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.ExportService;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -103,13 +103,10 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.io.Serializable;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Set;
@@ -873,7 +870,13 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
evFetcher, work.eventFrom, maxEventLimit, evFilter);
lastReplId = work.eventTo;
Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString());
- long resumeFrom = Utils.fileExists(ackFile, conf) ? getResumeFrom(ackFile)
: work.eventFrom;
+ boolean shouldBatch =
conf.getBoolVar(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS);
+
+ EventsDumpMetadata eventsDumpMetadata =
+ Utils.fileExists(ackFile, conf) ?
EventsDumpMetadata.deserialize(ackFile, conf)
+ : new EventsDumpMetadata(work.eventFrom, 0, shouldBatch);
+
+ long resumeFrom = eventsDumpMetadata.getLastReplId();
long estimatedNumEvents =
evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
maxEventLimit);
@@ -907,15 +910,44 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
LOG.info("Event id {} to {} are already dumped, skipping {} events",
work.eventFrom, resumeFrom, dumpedCount);
}
boolean isStagingDirCheckedForFailedEvents = false;
+
+ int batchNo = 0, eventCount = 0;
+ final int maxEventsPerBatch =
conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+ Path eventRootDir = dumpRoot;
+
+ if (shouldBatch && maxEventsPerBatch == 0) {
+ throw new SemanticException(String.format(
+ "batch size configured via %s cannot be set to zero since
batching is enabled",
+ HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname));
+ }
+ if (eventsDumpMetadata.isEventsBatched() != shouldBatch) {
+ LOG.error("Failed to resume from previous dump. {} was set to {} in
previous dump but currently it's" +
+ " set to {}. Cannot dump events in {} manner because
they were {} batched in " +
+ "the previous incomplete run",
+ HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname,
eventsDumpMetadata.isEventsBatched(),
+ shouldBatch, shouldBatch ? "batched" : "sequential",
shouldBatch ? "not" : ""
+ );
+
+ throw new HiveException(
+ String.format("Failed to resume from previous dump. %s must be
set to %s, but currently it's set to %s",
+ HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS,
+ eventsDumpMetadata.isEventsBatched(), shouldBatch)
+ );
+ }
+
while (evIter.hasNext()) {
NotificationEvent ev = evIter.next();
lastReplId = ev.getEventId();
+
+ if (shouldBatch && eventCount++ % maxEventsPerBatch == 0) {
+ eventRootDir = new Path(dumpRoot,
String.format(ReplUtils.INC_EVENTS_BATCH, ++batchNo));
+ }
if (ev.getEventId() <= resumeFrom) {
continue;
}
// Checking and removing remnant file from staging directory if
previous incremental repl dump is failed
if (!isStagingDirCheckedForFailedEvents) {
- cleanFailedEventDirIfExists(dumpRoot, ev.getEventId());
+ cleanFailedEventDirIfExists(eventRootDir, ev.getEventId());
isStagingDirCheckedForFailedEvents = true;
}
@@ -935,9 +967,10 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
}
}
- Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId));
- dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb);
- Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf);
+ Path eventDir = new Path(eventRootDir, String.valueOf(lastReplId));
+ dumpEvent(ev, eventDir, dumpRoot, cmRoot, hiveDb, eventsDumpMetadata);
+ eventsDumpMetadata.setLastReplId(lastReplId);
+ Utils.writeOutput(eventsDumpMetadata.serialize(), ackFile, conf);
}
replLogger.endLog(lastReplId.toString());
LOG.info("Done dumping events, preparing to return {},{}",
dumpRoot.toUri(), lastReplId);
@@ -1147,33 +1180,6 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
}
}
- private long getResumeFrom(Path ackFile) throws SemanticException {
- Retryable retryable = Retryable.builder()
- .withHiveConf(conf)
- .withRetryOnException(Exception.class).build();
- try {
- return retryable.executeCallable(() -> {
- BufferedReader br = null;
- try {
- FileSystem fs = ackFile.getFileSystem(conf);
- br = new BufferedReader(new InputStreamReader(fs.open(ackFile),
Charset.defaultCharset()));
- long lastEventID = Long.parseLong(br.readLine());
- return lastEventID;
- } finally {
- if (br != null) {
- try {
- br.close();
- } catch (Exception e) {
- //Do nothing
- }
- }
- }
- });
- } catch (Exception e) {
- throw new
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
- }
- }
-
private boolean needBootstrapAcidTablesDuringIncrementalDump() {
// If acid table dump is not enabled, then no need to check further.
if (!ReplUtils.includeAcidTableInDump(conf)) {
@@ -1188,7 +1194,8 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
|| conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
}
- private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot,
Path cmRoot, Hive db) throws Exception {
+ private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot,
Path cmRoot, Hive db,
+ EventsDumpMetadata eventsDumpMetadata) throws
Exception {
EventHandler.Context context = new EventHandler.Context(
evRoot,
dumpRoot,
@@ -1202,6 +1209,7 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
);
EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
eventHandler.handle(context);
+ eventsDumpMetadata.incrementEventsDumpedCount();
work.getMetricCollector().reportStageProgress(getName(),
ReplUtils.MetricName.EVENTS.name(), 1);
work.getReplLogger().eventLog(String.valueOf(ev.getEventId()),
eventHandler.dumpType().toString());
}
@@ -1498,8 +1506,8 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
Path lastEventFile = new Path(hiveDumpPath,
ReplAck.EVENTS_DUMP.toString());
long resumeFrom = 0;
try {
- resumeFrom = getResumeFrom(lastEventFile);
- } catch (SemanticException ex) {
+ resumeFrom = EventsDumpMetadata.deserialize(lastEventFile,
conf).getLastReplId();
+ } catch (HiveException ex) {
LOG.info("Could not get last repl id from {}, because of:",
lastEventFile, ex.getMessage());
}
return resumeFrom > 0L;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 60f24862112..ea5c0fd1503 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -34,6 +34,7 @@ import
org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuild
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -183,9 +184,13 @@ public class ReplLoadWork implements Serializable,
ReplLoadWorkMBean {
this.bootstrapIterator = null;
this.constraintsIterator = null;
}
- incrementalLoadTasksBuilder = new
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
- new IncrementalLoadEventsIterator(dumpDirectory, hiveConf),
hiveConf, eventTo, metricCollector,
- replStatsTracker, shouldFailover, tablesToBootstrap.size());
+ try {
+ incrementalLoadTasksBuilder = new
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
+ new IncrementalLoadEventsIterator(dumpDirectory, hiveConf),
hiveConf, eventTo, metricCollector,
+ replStatsTracker, shouldFailover, tablesToBootstrap.size());
+ } catch (HiveException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
} else {
this.bootstrapIterator = new BootstrapEventsIterator(new
Path(dumpDirectory, EximUtil.METADATA_PATH_NAME)
.toString(), dbNameToLoadIn, true, hiveConf, metricCollector);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
index f2c8e8fd542..8913b292ce9 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
@@ -22,7 +22,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.EventsDumpMetadata;
import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
import java.io.IOException;
@@ -37,17 +40,36 @@ import java.util.NoSuchElementException;
public class IncrementalLoadEventsIterator implements Iterator<FileStatus> {
private FileStatus[] eventDirs;
private int currentIndex;
+ // when events are batched numEvents denotes number of events in a given
batch else numEvents = totalEventsCount
private int numEvents;
+ private int totalEventsCount;
+ private boolean eventsBatched;
+ private Iterator<FileStatus> eventBatchDirsIterator;
+ private FileSystem fs;
- public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws
IOException {
+ public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws
IOException, HiveException {
Path eventPath = new Path(loadPath);
- FileSystem fs = eventPath.getFileSystem(conf);
+ fs = eventPath.getFileSystem(conf);
+ Path eventsDumpAckFile = new Path(eventPath,
ReplAck.EVENTS_DUMP.toString());
+
eventDirs = fs.listStatus(eventPath,
ReplUtils.getEventsDirectoryFilter(fs));
+
if ((eventDirs == null) || (eventDirs.length == 0)) {
- currentIndex = 0;
- numEvents = 0;
+ currentIndex = numEvents = totalEventsCount = 0;
return;
}
+ EventsDumpMetadata eventsDumpMetadata =
EventsDumpMetadata.deserialize(eventsDumpAckFile, conf);
+ eventsBatched = eventsDumpMetadata.isEventsBatched();
+ totalEventsCount = eventDirs.length;
+
+ if (eventsBatched) {
+ //eventDirs will now have batches eg. events_batch_2, events_batch_1 etc.
+ Arrays.sort(eventDirs, new EventDumpDirComparator());
+ eventBatchDirsIterator = Arrays.stream(eventDirs).iterator();
+ // get all events from first batch.
+ eventDirs = fs.listStatus(eventBatchDirsIterator.next().getPath(),
ReplUtils.getEventsDirectoryFilter(fs));
+ totalEventsCount = eventsDumpMetadata.getEventsDumpedCount();
+ }
// For event dump, each sub-dir is an individual event dump.
// We need to guarantee that the directory listing we got is in order of
event id.
Arrays.sort(eventDirs, new EventDumpDirComparator());
@@ -60,16 +82,34 @@ public class IncrementalLoadEventsIterator implements
Iterator<FileStatus> {
return (eventDirs != null && currentIndex < numEvents);
}
+ private boolean hasNextBatch() {
+ return eventBatchDirsIterator.hasNext();
+ }
+
@Override
public FileStatus next() {
- if (hasNext()) {
+ if (eventsBatched) {
+ FileStatus event = eventDirs[currentIndex++];
+ if (hasNextBatch() && currentIndex == numEvents) {
+ try {
+ eventDirs = fs.listStatus(eventBatchDirsIterator.next().getPath(),
ReplUtils.getEventsDirectoryFilter(fs));
+ Arrays.sort(eventDirs, new EventDumpDirComparator());
+ currentIndex = 0;
+ numEvents = eventDirs.length;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return event;
+ }
+ else if (hasNext()) {
return eventDirs[currentIndex++];
} else {
throw new NoSuchElementException("no more events");
}
}
- public int getNumEvents() {
- return numEvents;
+ public int getTotalEventsCount() {
+ return totalEventsCount;
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index c04f8c9465a..eda4a3f070e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -91,13 +91,13 @@ public class IncrementalLoadTasksBuilder {
outputs = new HashSet<>();
log = null;
this.conf = conf;
- replLogger = new IncrementalLoadLogger(dbName, loadPath,
iterator.getNumEvents(), replStatsTracker);
+ replLogger = new IncrementalLoadLogger(dbName, loadPath,
iterator.getTotalEventsCount(), replStatsTracker);
replLogger.startLog();
this.eventTo = eventTo;
setNumIteration(0);
this.metricCollector = metricCollector;
Map<String, Long> metricMap = new HashMap<>();
- metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long)
iterator.getNumEvents());
+ metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long)
iterator.getTotalEventsCount());
this.shouldFailover = shouldFailover;
if (shouldFailover) {
Database db = null;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index b884030723d..abc3e6627c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -160,6 +160,8 @@ public class ReplUtils {
// Service name for atlas.
public static final String REPL_ATLAS_SERVICE = "atlas";
+ public static final String INC_EVENTS_BATCH = "events_batch_%d";
+
/**
* Bootstrap REPL LOAD operation type on the examined object based on ckpt
state.
*/
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/EventsDumpMetadata.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/EventsDumpMetadata.java
new file mode 100644
index 00000000000..f2f38a99780
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/EventsDumpMetadata.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+
+/**
+ * This class represents the metadata related to events in repl dump.
+ * Metadata includes,
+ * lastReplId - the last successfully dumped eventId.
+ * eventsDumpedCount - number of events dumped in staging directory.
+ * eventsBatched - denotes if events are dumped in batches.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class EventsDumpMetadata {
+ @JsonProperty
+ private Long lastReplId;
+ @JsonProperty
+ private Integer eventsDumpedCount;
+ @JsonProperty
+ private boolean eventsBatched;
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public EventsDumpMetadata() {
+
+ }
+
+ public EventsDumpMetadata(Long lastReplId, Integer eventsDumpedCount,
boolean eventsBatched) {
+ this.lastReplId = lastReplId;
+ this.eventsDumpedCount = eventsDumpedCount;
+ this.eventsBatched = eventsBatched;
+ }
+
+ public Long getLastReplId() {
+ return lastReplId;
+ }
+
+ public Integer getEventsDumpedCount() {
+ return eventsDumpedCount;
+ }
+
+ public void setLastReplId(Long lastReplId) {
+ this.lastReplId = lastReplId;
+ }
+
+ public void setEventsDumpedCount(Integer eventsDumpedCount) {
+ this.eventsDumpedCount = eventsDumpedCount;
+ }
+
+ public void incrementEventsDumpedCount() {
+ this.eventsDumpedCount++;
+ }
+
+ public boolean isEventsBatched() {
+ return eventsBatched;
+ }
+
+ public void setEventsBatched(boolean eventsBatched) {
+ this.eventsBatched = eventsBatched;
+ }
+
+ public String serialize() throws JsonProcessingException {
+ return objectMapper.writeValueAsString(this);
+ }
+
+ public static EventsDumpMetadata deserialize(Path ackFile, HiveConf conf)
throws HiveException {
+ Retryable retryable = Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
+ try {
+ return retryable.executeCallable(() -> {
+ FileSystem fs = ackFile.getFileSystem(conf);
+ try (BufferedReader br = new BufferedReader(
+ new InputStreamReader(fs.open(ackFile),
Charset.defaultCharset()))
+ ) {
+ return objectMapper.readValue(br, EventsDumpMetadata.class);
+ }
+ });
+ } catch (Exception e) {
+ throw new
HiveException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
+ }
+ }
+}