This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4affdd0 [HUDI-3461] The archived timeline for flink streaming reader
should not be reused (#4861)
4affdd0 is described below
commit 4affdd0c8f02ccca4705515dcb6492c199e2cede
Author: Danny Chan <[email protected]>
AuthorDate: Tue Feb 22 15:54:29 2022 +0800
[HUDI-3461] The archived timeline for flink streaming reader should not be
reused (#4861)
* Before the patch, the flink streaming reader caches the meta client thus
the archived timeline,
when fetching the instant details from the reused timeline, the exception
throws
* Add a method in HoodieTableMetaClient to return a fresh new archived
timeline each time
---
.../hudi/common/table/HoodieTableMetaClient.java | 18 ++++++++++--
.../table/timeline/HoodieArchivedTimeline.java | 34 +++++++++++++++++++---
.../apache/hudi/source/IncrementalInputSplits.java | 16 ++--------
3 files changed, 49 insertions(+), 19 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 740d569..4c1eac7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -156,7 +156,7 @@ public class HoodieTableMetaClient implements Serializable {
*/
private void readObject(java.io.ObjectInputStream in) throws IOException,
ClassNotFoundException {
in.defaultReadObject();
- fs = null; // will be lazily inited
+ fs = null; // will be lazily initialized
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
@@ -330,7 +330,7 @@ public class HoodieTableMetaClient implements Serializable {
* Get the archived commits as a timeline. This is costly operation, as all
data from the archived files are read.
* This should not be used, unless for historical debugging purposes.
*
- * @return Active commit timeline
+ * @return Archived commit timeline
*/
public synchronized HoodieArchivedTimeline getArchivedTimeline() {
if (archivedTimeline == null) {
@@ -340,6 +340,20 @@ public class HoodieTableMetaClient implements Serializable
{
}
/**
+ * Returns fresh new archived commits as a timeline from startTs (inclusive).
+ *
+ * <p>This is costly operation if really early endTs is specified.
+ * Be caution to use this only when the time range is short.
+ *
+ * <p>This method is not thread safe.
+ *
+ * @return Archived commit timeline
+ */
+ public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
+ return new HoodieArchivedTimeline(this, startTs);
+ }
+
+ /**
* Validate table properties.
* @param properties Properties from writeConfig.
* @param operationType operation type to be executed.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 5ad3fa7..29f1665 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -79,13 +79,13 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
private static final String ACTION_TYPE_KEY = "actionType";
private static final String ACTION_STATE = "actionState";
private HoodieTableMetaClient metaClient;
- private Map<String, byte[]> readCommits = new HashMap<>();
+ private final Map<String, byte[]> readCommits = new HashMap<>();
private static final Logger LOG =
LogManager.getLogger(HoodieArchivedTimeline.class);
/**
- * Loads instants between (startTs, endTs].
- * Note that there is no lazy loading, so this may not work if really long
time range (endTs-startTs) is specified.
+ * Loads all the archived instants.
+ * Note that there is no lazy loading, so this may not work if the archived
timeline range is really long.
* TBD: Should we enforce maximum time range?
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
@@ -97,6 +97,19 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
}
/**
+ * Loads completed instants from startTs(inclusive).
+ * Note that there is no lazy loading, so this may not work if really early
startTs is specified.
+ */
+ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String
startTs) {
+ this.metaClient = metaClient;
+ setInstants(loadInstants(new StartTsFilter(startTs), true,
+ record ->
HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString())));
+ // multiple casts will make this lambda serializable -
+ // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
+ this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails;
+ }
+
+ /**
* For serialization and de-serialization only.
*
* @deprecated
@@ -300,6 +313,19 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
}
}
+ private static class StartTsFilter extends TimeRangeFilter {
+ private final String startTs;
+
+ public StartTsFilter(String startTs) {
+ super(startTs, null); // endTs is never used
+ this.startTs = startTs;
+ }
+
+ public boolean isInRange(HoodieInstant instant) {
+ return HoodieTimeline.compareTimestamps(instant.getTimestamp(),
GREATER_THAN_OR_EQUALS, startTs);
+ }
+ }
+
/**
* Sort files by reverse order of version suffix in file name.
*/
@@ -330,7 +356,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
// filter in-memory instants
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
return new HoodieDefaultTimeline(getInstants().filter(i ->
- readCommits.keySet().contains(i.getTimestamp()))
+ readCommits.containsKey(i.getTimestamp()))
.filter(s -> validActions.contains(s.getAction())), details);
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 58c38ef..02e0e25 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -250,22 +250,12 @@ public class IncrementalInputSplits implements
Serializable {
InstantRange instantRange,
HoodieTimeline commitTimeline,
String tableName) {
- if (instantRange == null ||
commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
- // read the archived metadata if:
- // 1. the start commit is 'earliest';
- // 2. the start instant is archived.
- HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline();
+ if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant()))
{
+ // read the archived metadata if the start instant is archived.
+ HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(instantRange.getStartInstant());
HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
if (!archivedCompleteTimeline.empty()) {
- final String endTs =
archivedCompleteTimeline.lastInstant().get().getTimestamp();
Stream<HoodieInstant> instantStream =
archivedCompleteTimeline.getInstants();
- if (instantRange != null) {
-
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(),
endTs);
- instantStream = instantStream.filter(s ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS,
instantRange.getStartInstant()));
- } else {
- final String startTs =
archivedCompleteTimeline.firstInstant().get().getTimestamp();
- archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
- }
return maySkipCompaction(instantStream)
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, archivedTimeline)).collect(Collectors.toList());
}