vinothchandar commented on a change in pull request #4034:
URL: https://github.com/apache/hudi/pull/4034#discussion_r756350588
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -339,11 +340,17 @@ public void refreshTimeline() throws IOException {
resumeCheckpointStr = Option.empty();
} else if
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
- throw new HoodieDeltaStreamerException(
- "Unable to find previous checkpoint. Please double check if this
table "
- + "was indeed built via delta streamer. Last Commit :" +
lastCommit + ", Instants :"
- +
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ",
CommitMetadata="
- + commitMetadata.toJsonString());
+ // if previous commit metadata did not have the checkpoint key, try
traversing previous commits until we find one.
+ Option<String> prevCheckpoint =
getPreviousCheckpoint(commitTimelineOpt.get());
Review comment:
anyway to do Option.orThrow or some pattern?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -88,33 +93,42 @@
/**
* Get the last completed transaction hoodie instant and {@link
HoodieCommitMetadata#getExtraMetadata()}.
+ *
* @param metaClient
* @return
*/
public static Option<Pair<HoodieInstant, Map<String, String>>>
getLastCompletedTxnInstantAndMetadata(
HoodieTableMetaClient metaClient) {
- Option<HoodieInstant> hoodieInstantOption =
metaClient.getActiveTimeline().getCommitsTimeline()
- .filterCompletedInstants().lastInstant();
- try {
- if (hoodieInstantOption.isPresent()) {
- switch (hoodieInstantOption.get().getAction()) {
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieReplaceCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
replaceCommitMetadata.getExtraMetadata()));
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- case HoodieTimeline.COMMIT_ACTION:
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
commitMetadata.getExtraMetadata()));
- default:
- throw new IllegalArgumentException("Unknown instant action" +
hoodieInstantOption.get().getAction());
+ List<HoodieInstant> hoodieInstants =
metaClient.getActiveTimeline().getCommitsTimeline()
+
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
+ if (!hoodieInstants.isEmpty()) {
+ for (HoodieInstant hoodieInstant : hoodieInstants) {
+ try {
+ switch (hoodieInstant.getAction()) {
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class);
+ return Option.of(Pair.of(hoodieInstant,
replaceCommitMetadata.getExtraMetadata()));
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case HoodieTimeline.COMMIT_ACTION:
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
+ if (commitMetadata.getOperationType() != null
+ &&
!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)
+ &&
!commitMetadata.getOperationType().equals(WriteOperationType.COMPACT)) { //
skip compaction instants
+ return Option.of(Pair.of(hoodieInstant,
commitMetadata.getExtraMetadata()));
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown instant action" +
hoodieInstant.getAction());
+ }
+ } catch (IOException io) {
+ throw new HoodieIOException("Unable to read metadata for instant " +
hoodieInstant, io);
}
- } else {
- return Option.empty();
}
- } catch (IOException io) {
- throw new HoodieIOException("Unable to read metadata for instant " +
hoodieInstantOption.get(), io);
+ return Option.empty();
Review comment:
anyway to avoid these nested empty returns
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
##########
@@ -196,34 +206,59 @@ void
testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType)
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob,
jsc());
backfillJob.sync();
+ meta.reloadActiveTimeline();
+ int totalCommits =
meta.getCommitsTimeline().filterCompletedInstants().countInstants();
+
// Save the checkpoint information from the deltastreamer run and perform
next write
String checkpointAfterDeltaSync =
getLatestMetadata(meta).getMetadata(CHECKPOINT_KEY);
// this writer will enable
HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key() so that
deltastreamer checkpoint will be carried over.
- performWriteWithDeltastreamerStateMerge();
+ doSparkWriteWithDeltastreamerStateMerge(true);// Verify that the
checkpoint is carried over
Review comment:
I feel you can just unit test this without actually doing the writes?
just add the commit metadata without checkpoints?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
##########
@@ -88,33 +93,42 @@
/**
* Get the last completed transaction hoodie instant and {@link
HoodieCommitMetadata#getExtraMetadata()}.
+ *
* @param metaClient
* @return
*/
public static Option<Pair<HoodieInstant, Map<String, String>>>
getLastCompletedTxnInstantAndMetadata(
HoodieTableMetaClient metaClient) {
- Option<HoodieInstant> hoodieInstantOption =
metaClient.getActiveTimeline().getCommitsTimeline()
- .filterCompletedInstants().lastInstant();
- try {
- if (hoodieInstantOption.isPresent()) {
- switch (hoodieInstantOption.get().getAction()) {
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieReplaceCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
replaceCommitMetadata.getExtraMetadata()));
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- case HoodieTimeline.COMMIT_ACTION:
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
commitMetadata.getExtraMetadata()));
- default:
- throw new IllegalArgumentException("Unknown instant action" +
hoodieInstantOption.get().getAction());
+ List<HoodieInstant> hoodieInstants =
metaClient.getActiveTimeline().getCommitsTimeline()
+
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
+ if (!hoodieInstants.isEmpty()) {
+ for (HoodieInstant hoodieInstant : hoodieInstants) {
+ try {
+ switch (hoodieInstant.getAction()) {
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class);
+ return Option.of(Pair.of(hoodieInstant,
replaceCommitMetadata.getExtraMetadata()));
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case HoodieTimeline.COMMIT_ACTION:
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
+ if (commitMetadata.getOperationType() != null
Review comment:
this probably is older commits?
##########
File path:
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java
##########
@@ -62,13 +66,28 @@ public void testCheckpointStateMerge() throws IOException {
timeline.createNewInstant(commitInstantWithCheckpointState);
HoodieCommitMetadata metadataWithCheckpoint = new HoodieCommitMetadata();
+ metadataWithCheckpoint.setOperationType(WriteOperationType.INSERT);
String checkpointVal = "00001";
metadataWithCheckpoint.addMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY,
checkpointVal);
timeline.saveAsComplete(
commitInstantWithCheckpointState,
Option.of(metadataWithCheckpoint.toJsonString().getBytes(StandardCharsets.UTF_8))
);
+ if (testCompaction) {
+ HoodieInstant compactionInstant = new HoodieInstant(
+ true,
+ HoodieTimeline.COMPACTION_ACTION,
Review comment:
not sure I follow this test. if you use COMMIT_ACTION above, then it
must be COW right? or conversely if testCompaction=true, should not we be
generating delta commit above?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -451,6 +458,22 @@ public void refreshTimeline() throws IOException {
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
+ private Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws
IOException {
Review comment:
unit test this ?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -451,6 +458,22 @@ public void refreshTimeline() throws IOException {
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
+ private Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws
IOException {
+ return (Option<String>) timeline.getReverseOrderedInstants().map(instant
-> {
Review comment:
I feel this can be more concise
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -451,6 +458,22 @@ public void refreshTimeline() throws IOException {
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
+ private Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws
IOException {
+ return (Option<String>) timeline.getReverseOrderedInstants().map(instant
-> {
Review comment:
write a `filter()` then a `findFirst()` and then `map()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]