This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 58bc859b173 [HUDI-7476] Incremental loading for archived timeline 
(#10807)
58bc859b173 is described below

commit 58bc859b173a3648ff5f7f2042aaadf8281cac2c
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 8 17:53:04 2024 +0800

    [HUDI-7476] Incremental loading for archived timeline (#10807)
---
 .../hudi/client/timeline/LSMTimelineWriter.java    |  17 +++-
 .../table/timeline/TestHoodieArchivedTimeline.java | 105 +++++++++++++++++++++
 .../table/timeline/HoodieArchivedTimeline.java     |  29 ++++++
 .../table/timeline/HoodieDefaultTimeline.java      |  69 ++++++++++++--
 4 files changed, 208 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
index 4f487410a8c..08500f06c9a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.client.timeline;
 
 import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -72,21 +73,29 @@ public class LSMTimelineWriter {
   public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
 
   private final HoodieWriteConfig config;
-  private final HoodieTable<?, ?, ?, ?> table;
+  private final TaskContextSupplier taskContextSupplier;
   private final HoodieTableMetaClient metaClient;
 
   private HoodieWriteConfig writeConfig;
 
   private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> 
table) {
+    this(config, table.getTaskContextSupplier(), table.getMetaClient());
+  }
+
+  private LSMTimelineWriter(HoodieWriteConfig config, TaskContextSupplier 
taskContextSupplier, HoodieTableMetaClient metaClient) {
     this.config = config;
-    this.table = table;
-    this.metaClient = table.getMetaClient();
+    this.taskContextSupplier = taskContextSupplier;
+    this.metaClient = metaClient;
   }
 
   public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
HoodieTable<?, ?, ?, ?> table) {
     return new LSMTimelineWriter(config, table);
   }
 
+  public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
TaskContextSupplier taskContextSupplier, HoodieTableMetaClient metaClient) {
+    return new LSMTimelineWriter(config, taskContextSupplier, metaClient);
+  }
+
   /**
    * Writes the list of active actions into the timeline.
    *
@@ -366,7 +375,7 @@ public class LSMTimelineWriter {
   private HoodieFileWriter openWriter(Path filePath) {
     try {
       return HoodieFileWriterFactory.getFileWriter("", filePath, 
metaClient.getHadoopConf(), getOrCreateWriterConfig(),
-          HoodieLSMTimelineInstant.getClassSchema(), 
table.getTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO);
+          HoodieLSMTimelineInstant.getClassSchema(), taskContextSupplier, 
HoodieRecord.HoodieRecordType.AVRO);
     } catch (IOException e) {
       throw new HoodieException("Unable to initialize archiving writer", e);
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
new file mode 100644
index 00000000000..664385f9ae0
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.DummyActiveAction;
+import org.apache.hudi.client.timeline.LSMTimelineWriter;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link HoodieArchivedTimeline}.
+ */
+public class TestHoodieArchivedTimeline extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanMetaClient();
+  }
+
+  @Test
+  public void testLoadingInstantsIncrementally() throws Exception {
+    writeArchivedTimeline(10, 10000000);
+    // now we got 500 instants spread in 5 parquets.
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline("10000043");
+    
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""),
 is("10000043"));
+    
assertThat(archivedTimeline.lastInstant().map(HoodieInstant::getTimestamp).orElse(""),
 is("10000050"));
+    // load incrementally
+    archivedTimeline.reload("10000034");
+    
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""),
 is("10000034"));
+    archivedTimeline.reload("10000011");
+    
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""),
 is("10000011"));
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void writeArchivedTimeline(int batchSize, long startTs) throws 
Exception {
+    HoodieTestTable testTable = HoodieTestTable.of(this.metaClient);
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(this.metaClient.getBasePathV2().toString())
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+        .withMarkersType("DIRECT")
+        .build();
+    HoodieEngineContext engineContext = new HoodieLocalEngineContext(new 
Configuration());
+    LSMTimelineWriter writer = LSMTimelineWriter.getInstance(writeConfig, new 
LocalTaskContextSupplier(), metaClient);
+    List<ActiveAction> instantBuffer = new ArrayList<>();
+    for (int i = 1; i <= 50; i++) {
+      long instantTimeTs = startTs + i;
+      String instantTime = String.valueOf(instantTimeTs);
+      String completionTime = String.valueOf(instantTimeTs + 10);
+      HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, 
"commit", instantTime, completionTime);
+      HoodieCommitMetadata metadata  = 
testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT, 
Arrays.asList("par1", "par2"), 10, false);
+      byte[] serializedMetadata = 
TimelineMetadataUtils.serializeCommitMetadata(metadata).get();
+      instantBuffer.add(new DummyActiveAction(instant, serializedMetadata));
+      if (i % batchSize == 0) {
+        // archive 10 instants each time
+        writer.write(instantBuffer, 
org.apache.hudi.common.util.Option.empty(), 
org.apache.hudi.common.util.Option.empty());
+        writer.compactAndClean(engineContext);
+        instantBuffer.clear();
+      }
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index c9aba9ebc0d..d6167ea61e7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -72,6 +72,13 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieArchivedTimeline.class);
 
+  /**
+   * Used for loading the archived timeline incrementally, the earliest loaded 
instant time get memorized
+   * each time the timeline is loaded. The instant time is then used as the 
end boundary
+   * of the next loading.
+   */
+  private String cursorInstant;
+
   /**
    * Loads all the archived instants.
    * Note that there is no lazy loading, so this may not work if the archived 
timeline range is really long.
@@ -80,6 +87,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
   public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
     this.metaClient = metaClient;
     setInstants(this.loadInstants());
+    this.cursorInstant = 
firstInstant().map(HoodieInstant::getTimestamp).orElse(null);
     // multiple casts will make this lambda serializable -
     // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
     this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails;
@@ -92,6 +100,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
   public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String 
startTs) {
     this.metaClient = metaClient;
     setInstants(loadInstants(new StartTsFilter(startTs), LoadMode.METADATA));
+    this.cursorInstant = startTs;
     // multiple casts will make this lambda serializable -
     // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
     this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails;
@@ -152,6 +161,26 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     return new HoodieArchivedTimeline(metaClient);
   }
 
+  /**
+   * Reloads the archived timeline incrementally with given beginning 
timestamp {@code startTs}.
+   * This method is not thread safe.
+   *
+   * <p>IMPORTANT: this is for multiple loading of one static snapshot of the 
timeline, if there is new instants got archived,
+   * use {@link #reload()} instead.
+   */
+  public HoodieArchivedTimeline reload(String startTs) {
+    if (this.cursorInstant != null) {
+      if (HoodieTimeline.compareTimestamps(startTs, LESSER_THAN, 
this.cursorInstant)) {
+        appendInstants(loadInstants(new ClosedOpenTimeRangeFilter(startTs, 
this.cursorInstant), LoadMode.METADATA));
+        this.cursorInstant = startTs;
+      }
+      return this;
+    } else {
+      // a null cursor instant indicates an empty timeline
+      return new HoodieArchivedTimeline(metaClient, startTs);
+    }
+  }
+
   private HoodieInstant readCommit(String instantTime, GenericRecord record, 
Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer) {
     final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
     final String completionTime = 
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index d08194f266c..7af56785906 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 
 import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
@@ -75,15 +77,23 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
 
   public void setInstants(List<HoodieInstant> instants) {
     this.instants = instants;
-    final MessageDigest md;
-    try {
-      md = MessageDigest.getInstance(HASHING_ALGORITHM);
-      this.instants.forEach(i -> md
-          .update(getUTF8Bytes(StringUtils.joinUsingDelim("_", 
i.getTimestamp(), i.getAction(), i.getState().name()))));
-    } catch (NoSuchAlgorithmException nse) {
-      throw new HoodieException(nse);
+    this.timelineHash = computeTimelineHash(this.instants);
+    clearState();
+  }
+
+  public void appendInstants(List<HoodieInstant> newInstants) {
+    if (newInstants.isEmpty()) {
+      // the new instants is empty, nothing to do.
+      return;
+    }
+    if (this.instants.isEmpty()) {
+      // the existing instants is empty, set up the new ones directly.
+      setInstants(newInstants);
+      return;
     }
-    this.timelineHash = StringUtils.toHexString(md.digest());
+    this.instants = mergeInstants(newInstants, this.instants);
+    this.timelineHash = computeTimelineHash(this.instants);
+    clearState();
   }
 
   /**
@@ -567,6 +577,11 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     return Option.fromJavaOptional(instants.stream().findFirst());
   }
 
+  private void clearState() {
+    instantTimeSet = null;
+    firstNonSavepointCommit = null;
+  }
+
   /**
    * Merge this timeline with the given timeline.
    */
@@ -581,4 +596,42 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     };
     return new HoodieDefaultTimeline(instantStream, details);
   }
+
+  /**
+   * Computes the timeline hash and returns.
+   */
+  private String computeTimelineHash(List<HoodieInstant> instants) {
+    final MessageDigest md;
+    try {
+      md = MessageDigest.getInstance(HASHING_ALGORITHM);
+      instants.forEach(i -> md
+          .update(getUTF8Bytes(StringUtils.joinUsingDelim("_", 
i.getTimestamp(), i.getAction(), i.getState().name()))));
+    } catch (NoSuchAlgorithmException nse) {
+      throw new HoodieException(nse);
+    }
+    return StringUtils.toHexString(md.digest());
+  }
+
+  /**
+   * Merges the given instant list into one and keep the sequence.
+   */
+  private static List<HoodieInstant> mergeInstants(List<HoodieInstant> 
instants1, List<HoodieInstant> instants2) {
+    ValidationUtils.checkArgument(!instants1.isEmpty() && 
!instants2.isEmpty(), "The instants to merge can not be empty");
+    // some optimizations are based on the assumption all the instant lists 
are already sorted.
+    // skip when one list contains all the instants of the other one.
+    final List<HoodieInstant> merged;
+    if (HoodieTimeline.compareTimestamps(instants1.get(instants1.size() - 
1).getTimestamp(), LESSER_THAN_OR_EQUALS, instants2.get(0).getTimestamp())) {
+      merged = new ArrayList<>(instants1);
+      merged.addAll(instants2);
+    } else if (HoodieTimeline.compareTimestamps(instants2.get(instants2.size() 
- 1).getTimestamp(), LESSER_THAN_OR_EQUALS, instants1.get(0).getTimestamp())) {
+      merged = new ArrayList<>(instants2);
+      merged.addAll(instants1);
+    } else {
+      merged = new ArrayList<>(instants1);
+      merged.addAll(instants2);
+      // sort the instants explicitly
+      Collections.sort(merged);
+    }
+    return merged;
+  }
 }

Reply via email to