This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4affdd0  [HUDI-3461] The archived timeline for flink streaming reader 
should not be reused (#4861)
4affdd0 is described below

commit 4affdd0c8f02ccca4705515dcb6492c199e2cede
Author: Danny Chan <[email protected]>
AuthorDate: Tue Feb 22 15:54:29 2022 +0800

    [HUDI-3461] The archived timeline for flink streaming reader should not be 
reused (#4861)
    
    * Before the patch, the flink streaming reader caches the meta client thus 
the archived timeline,
      when fetching the instant details from the reused timeline, the exception 
throws
    * Add a method in HoodieTableMetaClient to return a fresh new archived 
timeline each time
---
 .../hudi/common/table/HoodieTableMetaClient.java   | 18 ++++++++++--
 .../table/timeline/HoodieArchivedTimeline.java     | 34 +++++++++++++++++++---
 .../apache/hudi/source/IncrementalInputSplits.java | 16 ++--------
 3 files changed, 49 insertions(+), 19 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 740d569..4c1eac7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -156,7 +156,7 @@ public class HoodieTableMetaClient implements Serializable {
    */
   private void readObject(java.io.ObjectInputStream in) throws IOException, 
ClassNotFoundException {
     in.defaultReadObject();
-    fs = null; // will be lazily inited
+    fs = null; // will be lazily initialized
   }
 
   private void writeObject(java.io.ObjectOutputStream out) throws IOException {
@@ -330,7 +330,7 @@ public class HoodieTableMetaClient implements Serializable {
    * Get the archived commits as a timeline. This is costly operation, as all 
data from the archived files are read.
    * This should not be used, unless for historical debugging purposes.
    *
-   * @return Active commit timeline
+   * @return Archived commit timeline
    */
   public synchronized HoodieArchivedTimeline getArchivedTimeline() {
     if (archivedTimeline == null) {
@@ -340,6 +340,20 @@ public class HoodieTableMetaClient implements Serializable 
{
   }
 
   /**
+   * Returns fresh new archived commits as a timeline from startTs (inclusive).
+   *
+   * <p>This is costly operation if really early endTs is specified.
+   * Be caution to use this only when the time range is short.
+   *
+   * <p>This method is not thread safe.
+   *
+   * @return Archived commit timeline
+   */
+  public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
+    return new HoodieArchivedTimeline(this, startTs);
+  }
+
+  /**
    * Validate table properties.
    * @param properties Properties from writeConfig.
    * @param operationType operation type to be executed.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 5ad3fa7..29f1665 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -79,13 +79,13 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
   private static final String ACTION_TYPE_KEY = "actionType";
   private static final String ACTION_STATE = "actionState";
   private HoodieTableMetaClient metaClient;
-  private Map<String, byte[]> readCommits = new HashMap<>();
+  private final Map<String, byte[]> readCommits = new HashMap<>();
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieArchivedTimeline.class);
 
   /**
-   * Loads instants between (startTs, endTs].
-   * Note that there is no lazy loading, so this may not work if really long 
time range (endTs-startTs) is specified.
+   * Loads all the archived instants.
+   * Note that there is no lazy loading, so this may not work if the archived 
timeline range is really long.
    * TBD: Should we enforce maximum time range?
    */
   public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
@@ -97,6 +97,19 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
   }
 
   /**
+   * Loads completed instants from startTs(inclusive).
+   * Note that there is no lazy loading, so this may not work if really early 
startTs is specified.
+   */
+  public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String 
startTs) {
+    this.metaClient = metaClient;
+    setInstants(loadInstants(new StartTsFilter(startTs), true,
+        record -> 
HoodieInstant.State.COMPLETED.toString().equals(record.get(ACTION_STATE).toString())));
+    // multiple casts will make this lambda serializable -
+    // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
+    this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails;
+  }
+
+  /**
    * For serialization and de-serialization only.
    *
    * @deprecated
@@ -300,6 +313,19 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     }
   }
 
+  private static class StartTsFilter extends TimeRangeFilter {
+    private final String startTs;
+
+    public StartTsFilter(String startTs) {
+      super(startTs, null); // endTs is never used
+      this.startTs = startTs;
+    }
+
+    public boolean isInRange(HoodieInstant instant) {
+      return HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
GREATER_THAN_OR_EQUALS, startTs);
+    }
+  }
+
   /**
    * Sort files by reverse order of version suffix in file name.
    */
@@ -330,7 +356,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     // filter in-memory instants
     Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
     return new HoodieDefaultTimeline(getInstants().filter(i ->
-        readCommits.keySet().contains(i.getTimestamp()))
+        readCommits.containsKey(i.getTimestamp()))
         .filter(s -> validActions.contains(s.getAction())), details);
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 58c38ef..02e0e25 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -250,22 +250,12 @@ public class IncrementalInputSplits implements 
Serializable {
       InstantRange instantRange,
       HoodieTimeline commitTimeline,
       String tableName) {
-    if (instantRange == null || 
commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
-      // read the archived metadata if:
-      // 1. the start commit is 'earliest';
-      // 2. the start instant is archived.
-      HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline();
+    if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) 
{
+      // read the archived metadata if the start instant is archived.
+      HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(instantRange.getStartInstant());
       HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
       if (!archivedCompleteTimeline.empty()) {
-        final String endTs = 
archivedCompleteTimeline.lastInstant().get().getTimestamp();
         Stream<HoodieInstant> instantStream = 
archivedCompleteTimeline.getInstants();
-        if (instantRange != null) {
-          
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), 
endTs);
-          instantStream = instantStream.filter(s -> 
HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, 
instantRange.getStartInstant()));
-        } else {
-          final String startTs = 
archivedCompleteTimeline.firstInstant().get().getTimestamp();
-          archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
-        }
         return maySkipCompaction(instantStream)
             .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, archivedTimeline)).collect(Collectors.toList());
       }

Reply via email to