This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fd3c8f35d2c [MINOR] Fixing commit time parsing w/ archival (#9069)
fd3c8f35d2c is described below
commit fd3c8f35d2cb9bfa0c5225369ad165babc189a9d
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jun 28 10:37:53 2023 -0400
[MINOR] Fixing commit time parsing w/ archival (#9069)
* Fixing commit time parsing w/ archival
* Check for init timestamp before logging warn msg
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 40 +++++++++++++---------
.../table/timeline/HoodieActiveTimeline.java | 3 +-
2 files changed, 24 insertions(+), 19 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 0a28a69c784..b1a82c1ed03 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -85,6 +85,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.NOT_PARSABLE_TIMESTAMPS;
+import static
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
@@ -118,28 +120,12 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
this.metaClient = table.getMetaClient();
this.archiveFilePath =
HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
-
- Option<HoodieInstant> earliestCommitToRetain = Option.empty();
HoodieTimeline completedCommitsTimeline =
table.getCompletedCommitsTimeline();
Option<HoodieInstant> latestCommit =
completedCommitsTimeline.lastInstant();
HoodieCleaningPolicy cleanerPolicy = config.getCleanerPolicy();
int cleanerCommitsRetained = config.getCleanerCommitsRetained();
int cleanerHoursRetained = config.getCleanerHoursRetained();
-
- try {
- earliestCommitToRetain = CleanerUtils.getEarliestCommitToRetain(
- metaClient.getActiveTimeline().getCommitsTimeline(),
- cleanerPolicy,
- cleanerCommitsRetained,
- latestCommit.isPresent()
- ?
HoodieActiveTimeline.parseDateFromInstantTime(latestCommit.get().getTimestamp()).toInstant()
- : Instant.now(),
- cleanerHoursRetained,
- metaClient.getTableConfig().getTimelineTimezone());
- } catch (ParseException e) {
- LOG.warn("Error parsing instant time: " +
latestCommit.get().getTimestamp());
- }
-
+ Option<HoodieInstant> earliestCommitToRetain =
getEarliestCommitToRetain(latestCommit, cleanerPolicy, cleanerCommitsRetained,
cleanerHoursRetained);
int configuredMinInstantsToKeep = config.getMinCommitsToKeep();
int configuredMaxInstantsToKeep = config.getMaxCommitsToKeep();
if (earliestCommitToRetain.isPresent()) {
@@ -182,6 +168,26 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
}
}
+ private Option<HoodieInstant>
getEarliestCommitToRetain(Option<HoodieInstant> latestCommit,
HoodieCleaningPolicy cleanerPolicy, int cleanerCommitsRetained, int
cleanerHoursRetained) {
+ Option<HoodieInstant> earliestCommitToRetain = Option.empty();
+ try {
+ earliestCommitToRetain = CleanerUtils.getEarliestCommitToRetain(
+ metaClient.getActiveTimeline().getCommitsTimeline(),
+ cleanerPolicy,
+ cleanerCommitsRetained,
+ latestCommit.isPresent()
+ ?
parseDateFromInstantTime(latestCommit.get().getTimestamp()).toInstant()
+ : Instant.now(),
+ cleanerHoursRetained,
+ metaClient.getTableConfig().getTimelineTimezone());
+ } catch (ParseException e) {
+ if (NOT_PARSABLE_TIMESTAMPS.stream().noneMatch(ts ->
latestCommit.get().getTimestamp().startsWith(ts))) {
+ LOG.warn("Error parsing instant time: " +
latestCommit.get().getTimestamp());
+ }
+ }
+ return earliestCommitToRetain;
+ }
+
private Writer openWriter() {
try {
if (this.writer == null) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index e8c94e474fa..dbfe484531a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -76,7 +76,7 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION,
INDEX_COMMIT_EXTENSION,
REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION,
INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION, SAVE_SCHEMA_ACTION_EXTENSION));
- private static final Set<String> NOT_PARSABLE_TIMESTAMPS = new
HashSet<String>(3) {{
+ public static final Set<String> NOT_PARSABLE_TIMESTAMPS = new
HashSet<String>(3) {{
add(HoodieTimeline.INIT_INSTANT_TS);
add(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
add(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
@@ -481,7 +481,6 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
return commitInstant;
}
-
//-----------------------------------------------------------------
// END - COMPACTION RELATED META-DATA MANAGEMENT
//-----------------------------------------------------------------