This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1fa9e37df92 [HUDI-6031] fix bug: checkpoint lost after changing cow to
mor (#8378)
1fa9e37df92 is described below
commit 1fa9e37df92c7a38c3909ef1e71f21bad03b3d84
Author: kongwei <[email protected]>
AuthorDate: Sun Apr 30 15:05:37 2023 +0800
[HUDI-6031] fix bug: checkpoint lost after changing cow to mor (#8378)
Co-authored-by: wei.kong <[email protected]>
---
.../testsuite/HoodieDeltaStreamerWrapper.java | 2 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 48 ++++++++++---------
.../deltastreamer/TestHoodieDeltaStreamer.java | 54 ++++++++++++++++++++++
3 files changed, 82 insertions(+), 22 deletions(-)
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 5ff91fa5209..632bbecf10d 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -78,7 +78,7 @@ public class HoodieDeltaStreamerWrapper extends
HoodieDeltaStreamer {
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchSource() throws Exception {
DeltaSync service = getDeltaSync();
service.refreshTimeline();
- return service.readFromSource(service.getCommitTimelineOpt());
+ return service.readFromSource(service.getCommitsTimelineOpt());
}
public DeltaSync getDeltaSync() {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index e2efe1a4e35..c59510f3676 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -227,11 +227,11 @@ public class DeltaSync implements Serializable, Closeable
{
private transient Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient;
/**
- * Timeline with completed commits.
+ * Timeline with completed commits, including both .commit and .deltacommit.
*/
- private transient Option<HoodieTimeline> commitTimelineOpt;
+ private transient Option<HoodieTimeline> commitsTimelineOpt;
- // all commits timeline
+ // all commits timeline, including all (commits, delta commits, compaction,
clean, savepoint, rollback, replace commits, index)
private transient Option<HoodieTimeline> allCommitsTimelineOpt;
/**
@@ -320,11 +320,9 @@ public class DeltaSync implements Serializable, Closeable {
.build();
switch (meta.getTableType()) {
case COPY_ON_WRITE:
- this.commitTimelineOpt =
Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
- this.allCommitsTimelineOpt =
Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
- break;
case MERGE_ON_READ:
- this.commitTimelineOpt =
Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
+ // we can use getCommitsTimeline for both COW and MOR here,
because for COW there is no deltacommit
+ this.commitsTimelineOpt =
Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
this.allCommitsTimelineOpt =
Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
break;
default:
@@ -361,7 +359,7 @@ public class DeltaSync implements Serializable, Closeable {
}
private void initializeEmptyTable() throws IOException {
- this.commitTimelineOpt = Option.empty();
+ this.commitsTimelineOpt = Option.empty();
this.allCommitsTimelineOpt = Option.empty();
String partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
HoodieTableMetaClient.withPropertyBuilder()
@@ -398,7 +396,7 @@ public class DeltaSync implements Serializable, Closeable {
// Refresh Timeline
refreshTimeline();
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
srcRecordsWithCkpt = readFromSource(commitTimelineOpt);
+ Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt);
if (null != srcRecordsWithCkpt) {
// this is the first input batch. If schemaProvider not set, use it and
register Avro Schema and start
@@ -448,16 +446,16 @@ public class DeltaSync implements Serializable, Closeable
{
/**
* Read from Upstream Source and apply transformation if needed.
*
- * @param commitTimelineOpt Timeline with completed commits
+ * @param commitsTimelineOpt Timeline with completed commits, including
.commit and .deltacommit
* @return Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> Input
data read from upstream source, consists
* of schemaProvider, checkpointStr and hoodieRecord
* @throws Exception in case of any Exception
*/
- public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
readFromSource(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
+ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
readFromSource(Option<HoodieTimeline> commitsTimelineOpt) throws IOException {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
- if (commitTimelineOpt.isPresent()) {
- resumeCheckpointStr = getCheckpointToResume(commitTimelineOpt);
+ if (commitsTimelineOpt.isPresent()) {
+ resumeCheckpointStr = getCheckpointToResume(commitsTimelineOpt);
} else {
// initialize the table for the first time.
String partitionColumns =
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
@@ -650,16 +648,24 @@ public class DeltaSync implements Serializable, Closeable
{
/**
* Process previous commit metadata and checkpoint configs set by user to
determine the checkpoint to resume from.
- * @param commitTimelineOpt commit timeline of interest.
+ *
+ * @param commitsTimelineOpt commits timeline of interest, including .commit
and .deltacommit.
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
- private Option<String> getCheckpointToResume(Option<HoodieTimeline>
commitTimelineOpt) throws IOException {
+ private Option<String> getCheckpointToResume(Option<HoodieTimeline>
commitsTimelineOpt) throws IOException {
Option<String> resumeCheckpointStr = Option.empty();
- Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
+ // try get checkpoint from commits(including commit and deltacommit)
+ // in COW migrating to MOR case, the first batch of the deltastreamer will
lost the checkpoint from COW table, cause the dataloss
+ HoodieTimeline deltaCommitTimeline =
commitsTimelineOpt.get().filter(instant ->
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+ // has deltacommit means this is a MOR table, we should get .deltacommit
as before
+ if (!deltaCommitTimeline.empty()) {
+ commitsTimelineOpt = Option.of(deltaCommitTimeline);
+ }
+ Option<HoodieInstant> lastCommit = commitsTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
// if previous commit metadata did not have the checkpoint key, try
traversing previous commits until we find one.
- Option<HoodieCommitMetadata> commitMetadataOption =
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
+ Option<HoodieCommitMetadata> commitMetadataOption =
getLatestCommitMetadataWithValidCheckpointInfo(commitsTimelineOpt.get());
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
LOG.debug("Checkpoint reset from metadata: " +
commitMetadata.getMetadata(CHECKPOINT_RESET_KEY));
@@ -675,7 +681,7 @@ public class DeltaSync implements Serializable, Closeable {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this
table "
+ "was indeed built via delta streamer. Last Commit :" +
lastCommit + ", Instants :"
- + commitTimelineOpt.get().getInstants() + ", CommitMetadata="
+ + commitsTimelineOpt.get().getInstants() + ",
CommitMetadata="
+ commitMetadata.toJsonString());
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
@@ -807,7 +813,7 @@ public class DeltaSync implements Serializable, Closeable {
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
if (errorTableWriter.isPresent()) {
// Commit the error events triggered so far to the error table
- Option<String> commitedInstantTime =
getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+ Option<String> commitedInstantTime =
getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
boolean errorTableSuccess =
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
if (!errorTableSuccess) {
switch (errorWriteFailureStrategy) {
@@ -1143,8 +1149,8 @@ public class DeltaSync implements Serializable, Closeable
{
return cfg;
}
- public Option<HoodieTimeline> getCommitTimelineOpt() {
- return commitTimelineOpt;
+ public Option<HoodieTimeline> getCommitsTimelineOpt() {
+ return commitsTimelineOpt;
}
public HoodieIngestionMetrics getMetrics() {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d5d8dca34d6..82b7589448e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -143,6 +143,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
import static
org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE;
import static
org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
@@ -2600,6 +2601,59 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + "
should exist");
}
+ @Test
+ public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
+ String tableBasePath = basePath +
"/test_resume_checkpoint_after_changing_cow_to_mor";
+ // default table type is COW
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT);
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+ // change cow to mor
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(new Configuration(fs.getConf()))
+ .setBasePath(cfg.targetBasePath)
+ .setLoadActiveTimelineOnLoad(false)
+ .build();
+ Properties hoodieProps = new Properties();
+ hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
+ LOG.info("old props: {}", hoodieProps);
+ hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
+ LOG.info("new props: {}", hoodieProps);
+ Path metaPathDir = new Path(metaClient.getBasePathV2(), METAFOLDER_NAME);
+ HoodieTableConfig.create(metaClient.getFs(), metaPathDir, hoodieProps);
+
+ // continue deltastreamer
+ cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
+ cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
+ TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+ List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1450, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+ // currently there should be 1 deltacommits now
+ TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
+
+ // test the table type is already mor
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ // out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
+ // total records should be 1900 now
+ TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+ counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1900, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+ // currently there should be 2 deltacommits now
+ TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+
+ // clean up
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+ }
+
class TestDeltaSync extends DeltaSync {
public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession
sparkSession, SchemaProvider schemaProvider, TypedProperties props,