This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d056d285e1 [HUDI-7164] Add start time query API in 
CompletionTimeQueryView (#10218)
3d056d285e1 is described below

commit 3d056d285e138ac34bfd6389db562ebfd09a7d9e
Author: Danny Chan <[email protected]>
AuthorDate: Fri Dec 1 13:05:29 2023 +0800

    [HUDI-7164] Add start time query API in CompletionTimeQueryView (#10218)
---
 .../timeline/TestCompletionTimeQueryView.java      | 28 ++++++++++-
 .../table/timeline/CompletionTimeQueryView.java    | 58 ++++++++++++++++++++--
 .../table/timeline/HoodieArchivedTimeline.java     | 10 ++--
 .../table/timeline/HoodieInstantTimeGenerator.java | 11 ++++
 .../hudi/common/table/timeline/LSMTimeline.java    |  6 ++-
 .../common/util/ArchivedInstantReadSchemas.java    | 20 +++++++-
 .../hudi/common/testutils/FileCreateUtils.java     | 17 +++++--
 .../hudi/common/testutils/HoodieTestTable.java     |  6 ++-
 .../benchmark/LSMTimelineReadBenchmark.scala       | 23 ++++++---
 9 files changed, 159 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
index 9df49d4b9d0..9b65ab225e4 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
@@ -92,6 +92,32 @@ public class TestCompletionTimeQueryView {
     }
   }
 
+  @Test
+  void testReadStartTime() throws Exception {
+    String tableName = "testTable";
+    String tablePath = tempFile.getAbsolutePath() + Path.SEPARATOR + tableName;
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(new 
Configuration(), tablePath, HoodieTableType.COPY_ON_WRITE, tableName);
+    prepareTimeline(tablePath, metaClient);
+    try (CompletionTimeQueryView view = new 
CompletionTimeQueryView(metaClient, String.format("%08d", 3))) {
+      // query start time from LSM timeline
+      assertThat(getInstantTimeSetFormattedString(view, 3 + 1000, 6 + 1000), 
is("00000003,00000004,00000005,00000006"));
+      // query start time from active timeline
+      assertThat(getInstantTimeSetFormattedString(view, 7 + 1000, 10 + 1000), 
is("00000007,00000008,00000009,00000010"));
+      // lazy loading
+      assertThat(getInstantTimeSetFormattedString(view, 1 + 1000, 2 + 1000), 
is("00000001,00000002"));
+      assertThat("The cursor instant should be slided", 
view.getCursorInstant(), is(String.format("%08d", 1)));
+      // query with partial non-existing completion time
+      assertThat(getInstantTimeSetFormattedString(view, 10 + 1000, 11 + 1000), 
is("00000010"));
+      // query with non-existing completion time
+      assertThat(getInstantTimeSetFormattedString(view, 12 + 1000, 15 + 1000), 
is(""));
+    }
+  }
+
+  private String getInstantTimeSetFormattedString(CompletionTimeQueryView 
view, int completionTime1, int completionTime2) {
+    return view.getStartTimeSet(String.format("%08d", completionTime1), 
String.format("%08d", completionTime2), s -> String.format("%08d", 
Integer.parseInt(s) - 1000))
+        .stream().sorted().collect(Collectors.joining(","));
+  }
+
   private void prepareTimeline(String tablePath, HoodieTableMetaClient 
metaClient) throws Exception {
     HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(tablePath)
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
@@ -103,7 +129,7 @@ public class TestCompletionTimeQueryView {
       String instantTime = String.format("%08d", i);
       String completionTime = String.format("%08d", i + 1000);
       HoodieCommitMetadata metadata = 
testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT, 
Arrays.asList("par1", "par2"), 10, false);
-      testTable.addCommit(instantTime, Option.of(metadata));
+      testTable.addCommit(instantTime, Option.of(completionTime), 
Option.of(metadata));
       activeActions.add(
           new DummyActiveAction(
               new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", 
instantTime, completionTime),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index e53f185bffd..6bac49b83c9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.timeline;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.generic.GenericRecord;
@@ -27,9 +28,13 @@ import java.io.Serializable;
 import java.time.Instant;
 import java.util.Date;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 
@@ -41,6 +46,8 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
 
   private static final long MILLI_SECONDS_IN_THREE_DAYS = 3 * 24 * 3600 * 1000;
 
+  private static final long MILLI_SECONDS_IN_ONE_DAY = 24 * 3600 * 1000;
+
   private final HoodieTableMetaClient metaClient;
 
   /**
@@ -159,20 +166,65 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
       // the instant is still pending
       return Option.empty();
     }
+    loadCompletionTimeIncrementally(startTime);
+    return 
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
+   *
+   * @param startCompletionTime The start completion time.
+   * @param endCompletionTime   The end completion time.
+   *
+   * @return The instant time set.
+   */
+  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+    // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
+    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param startCompletionTime   The start completion time.
+   * @param endCompletionTime     The end completion time.
+   * @param earliestStartTimeFunc The function to generate the earliest start 
time boundary
+   *                              with the minimum completion time {@code 
startCompletionTime}.
+   *
+   * @return The instant time set.
+   */
+  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
+    String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+    final InstantRange instantRange = InstantRange.builder()
+        .rangeType(InstantRange.RangeType.CLOSE_CLOSE)
+        .startInstant(startCompletionTime)
+        .endInstant(endCompletionTime)
+        .nullableBoundary(true)
+        .build();
+    if (HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN, 
startInstant)) {
+      loadCompletionTimeIncrementally(startInstant);
+    }
+    return this.startToCompletionInstantTimeMap.entrySet().stream()
+        .filter(entry -> instantRange.isInRange(entry.getValue()))
+        .map(Map.Entry::getKey).collect(Collectors.toSet());
+  }
+
+  private void loadCompletionTimeIncrementally(String startTime) {
     // the 'startTime' should be out of the eager loading range, switch to a 
lazy loading.
     // This operation is resource costly.
     synchronized (this) {
       if (HoodieTimeline.compareTimestamps(startTime, LESSER_THAN, 
this.cursorInstant)) {
         HoodieArchivedTimeline.loadInstants(metaClient,
             new HoodieArchivedTimeline.ClosedOpenTimeRangeFilter(startTime, 
this.cursorInstant),
-            HoodieArchivedTimeline.LoadMode.SLIM,
+            HoodieArchivedTimeline.LoadMode.TIME,
             r -> true,
             this::readCompletionTime);
       }
       // refresh the start instant
       this.cursorInstant = startTime;
     }
-    return 
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
   }
 
   /**
@@ -188,7 +240,7 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
     // then load the archived instants.
     HoodieArchivedTimeline.loadInstants(metaClient,
         new HoodieArchivedTimeline.StartTsFilter(this.cursorInstant),
-        HoodieArchivedTimeline.LoadMode.SLIM,
+        HoodieArchivedTimeline.LoadMode.TIME,
         r -> true,
         this::readCompletionTime);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index c489362ae29..2775dc23e0f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -182,7 +182,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
   }
 
   private List<HoodieInstant> loadInstants() {
-    return loadInstants(null, LoadMode.SLIM);
+    return loadInstants(null, LoadMode.ACTION);
   }
 
   private List<HoodieInstant> loadInstants(String startTs, String endTs) {
@@ -274,16 +274,20 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
    * Different mode for loading the archived instant metadata.
    */
   public enum LoadMode {
+    /**
+     * Loads the instantTime, completionTime.
+     */
+    TIME,
     /**
      * Loads the instantTime, completionTime, action.
      */
-    SLIM,
+    ACTION,
     /**
      * Loads the instantTime, completionTime, action, metadata.
      */
     METADATA,
     /**
-     * Loads the instantTime, completionTime, plan.
+     * Loads the instantTime, completionTime, action, plan.
      */
     PLAN
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index 7c20afb6967..2e48e40820d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -105,6 +105,17 @@ public class HoodieInstantTimeGenerator {
     }
   }
 
+  public static String instantTimeMinusMillis(String timestamp, long 
milliseconds) {
+    try {
+      String timestampInMillis = fixInstantTimeCompatibility(timestamp);
+      LocalDateTime dt = LocalDateTime.parse(timestampInMillis, 
MILLIS_INSTANT_TIME_FORMATTER);
+      ZoneId zoneId = HoodieTimelineTimeZone.UTC.equals(commitTimeZone) ? 
ZoneId.of("UTC") : ZoneId.systemDefault();
+      return 
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().minusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
+    } catch (DateTimeParseException e) {
+      throw new HoodieException(e);
+    }
+  }
+
   private static String fixInstantTimeCompatibility(String instantTime) {
     // Enables backwards compatibility with non-millisecond granularity 
instants
     if (isSecondGranularity(instantTime)) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
index 450c41bc56a..abe275ddd2c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java
@@ -119,8 +119,10 @@ public class LSMTimeline {
   // -------------------------------------------------------------------------
   public static Schema getReadSchema(HoodieArchivedTimeline.LoadMode loadMode) 
{
     switch (loadMode) {
-      case SLIM:
-        return ArchivedInstantReadSchemas.TIMELINE_LSM_SLIM_READ_SCHEMA;
+      case TIME:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_TIME;
+      case ACTION:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_ACTION;
       case METADATA:
         return 
ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_METADATA;
       case PLAN:
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedInstantReadSchemas.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedInstantReadSchemas.java
index 0bcc5b976d4..69783f7f95d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedInstantReadSchemas.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ArchivedInstantReadSchemas.java
@@ -24,7 +24,25 @@ import org.apache.avro.Schema;
  * Avro schema for different archived instant read cases.
  */
 public abstract class ArchivedInstantReadSchemas {
-  public static final Schema TIMELINE_LSM_SLIM_READ_SCHEMA = new 
Schema.Parser().parse("{\n"
+  public static final Schema TIMELINE_LSM_READ_SCHEMA_WITH_TIME = new 
Schema.Parser().parse("{\n"
+      + "   \"type\":\"record\",\n"
+      + "   \"name\":\"HoodieArchivedMetaEntryV2\",\n"
+      + "   \"namespace\":\"org.apache.hudi.avro.model\",\n"
+      + "   \"fields\":[\n"
+      + "      {\n"
+      + "         \"name\":\"instantTime\",\n"
+      + "         \"type\":[\"null\",\"string\"],\n"
+      + "         \"default\": null\n"
+      + "      },\n"
+      + "      {\n"
+      + "         \"name\":\"completionTime\",\n"
+      + "         \"type\":[\"null\",\"string\"],\n"
+      + "         \"default\": null\n"
+      + "      }\n"
+      + "   ]\n"
+      + "}");
+
+  public static final Schema TIMELINE_LSM_READ_SCHEMA_WITH_ACTION = new 
Schema.Parser().parse("{\n"
       + "   \"type\":\"record\",\n"
       + "   \"name\":\"HoodieArchivedMetaEntryV2\",\n"
       + "   \"namespace\":\"org.apache.hudi.avro.model\",\n"
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index de9b92d90a9..0d1e122c330 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -63,6 +63,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata;
@@ -138,6 +139,10 @@ public class FileCreateUtils {
   }
 
   private static void createMetaFile(String basePath, String instantTime, 
String suffix, byte[] content) throws IOException {
+    createMetaFile(basePath, instantTime, 
InProcessTimeGenerator::createNewInstantTime, suffix, content);
+  }
+
+  private static void createMetaFile(String basePath, String instantTime, 
Supplier<String> completionTimeSupplier, String suffix, byte[] content) throws 
IOException {
     Path parentPath = Paths.get(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
     Files.createDirectories(parentPath);
     if (suffix.contains(HoodieTimeline.INFLIGHT_EXTENSION) || 
suffix.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
@@ -154,7 +159,7 @@ public class FileCreateUtils {
         // The instant file is not exist
         if (!dirStream.iterator().hasNext()) {
           // doesn't contains completion time
-          String instantTimeAndCompletionTime = instantTime + "_" + 
InProcessTimeGenerator.createNewInstantTime();
+          String instantTimeAndCompletionTime = instantTime + "_" + 
completionTimeSupplier.get();
           Path metaFilePath = parentPath.resolve(instantTimeAndCompletionTime 
+ suffix);
           if (content.length == 0) {
             Files.createFile(metaFilePath);
@@ -188,12 +193,18 @@ public class FileCreateUtils {
   }
 
   public static void createCommit(String basePath, String instantTime, 
Option<HoodieCommitMetadata> metadata) throws IOException {
+    createCommit(basePath, instantTime, Option.empty(), metadata);
+  }
+
+  public static void createCommit(String basePath, String instantTime, 
Option<String> completionTime, Option<HoodieCommitMetadata> metadata) throws 
IOException {
+    final Supplier<String> completionTimeSupplier = () -> 
completionTime.isPresent() ? completionTime.get() : 
InProcessTimeGenerator.createNewInstantTime();
     if (metadata.isPresent()) {
       HoodieCommitMetadata commitMetadata = metadata.get();
-      createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION,
+      createMetaFile(basePath, instantTime, completionTimeSupplier, 
HoodieTimeline.COMMIT_EXTENSION,
           serializeCommitMetadata(commitMetadata).get());
     } else {
-      createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
+      createMetaFile(basePath, instantTime, completionTimeSupplier, 
HoodieTimeline.COMMIT_EXTENSION,
+          getUTF8Bytes(""));
     }
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 202827ce0c7..4ff3b2b7e46 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -219,9 +219,13 @@ public class HoodieTestTable {
   }
 
   public HoodieTestTable addCommit(String instantTime, 
Option<HoodieCommitMetadata> metadata) throws Exception {
+    return addCommit(instantTime, Option.empty(), metadata);
+  }
+
+  public HoodieTestTable addCommit(String instantTime, Option<String> 
completionTime, Option<HoodieCommitMetadata> metadata) throws Exception {
     createRequestedCommit(basePath, instantTime);
     createInflightCommit(basePath, instantTime);
-    createCommit(basePath, instantTime, metadata);
+    createCommit(basePath, instantTime, completionTime, metadata);
     currentInstantTime = instantTime;
     return this;
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
index 76f82891534..57633098b25 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.HoodieJavaEngineContext
 import org.apache.hudi.client.timeline.LSMTimelineWriter
 import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieCommitMetadata, 
HoodieTableType, WriteOperationType}
 import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata
-import org.apache.hudi.common.table.timeline.{ActiveAction, 
HoodieArchivedTimeline, HoodieInstant, LSMTimeline}
+import org.apache.hudi.common.table.timeline.{ActiveAction, 
CompletionTimeQueryView, HoodieArchivedTimeline, HoodieInstant, LSMTimeline}
 import org.apache.hudi.common.testutils.{HoodieTestTable, HoodieTestUtils}
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.index.HoodieIndex.IndexType
@@ -42,8 +42,9 @@ object LSMTimelineReadBenchmark extends HoodieBenchmarkBase {
    * Apple M2
    * pref load archived instants:              Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    * 
------------------------------------------------------------------------------------------------------------------------
-   * read shim instants                                   18             32    
      15          0.1       17914.8       1.0X
-   * read instants with commit metadata                   19             25    
       5          0.1       19403.1       0.9X
+   * read slim instants                                  494            521    
      27          0.5        1899.6       1.0X
+   * read instants with commit metadata                 2544           2625    
     116          0.1        9785.9       0.2X
+   * read start time                                     156            177    
      26          1.7         601.1       3.2X
    */
   private def readArchivedInstantsBenchmark(): Unit = {
     withTempDir(f => {
@@ -60,13 +61,14 @@ object LSMTimelineReadBenchmark extends HoodieBenchmarkBase 
{
 
       val startTs = System.currentTimeMillis()
       val startInstant = startTs + 1 + ""
-      val commitsNum = 10000000
-      val batchSize = 2000
+      val commitsNum = 260000
+      val batchSize = 10
       val instantBuffer = new util.ArrayList[ActiveAction]()
       for (i <- 1 to commitsNum) {
         val instantTime = startTs + i + ""
+        val completionTime = startTs + i + 1000 + ""
         val action = if (i % 2 == 0) "delta_commit" else "commit"
-        val instant = new HoodieInstant(HoodieInstant.State.COMPLETED, action, 
instantTime, instantTime + 1000)
+        val instant = new HoodieInstant(HoodieInstant.State.COMPLETED, action, 
instantTime, completionTime)
         val metadata: HoodieCommitMetadata = 
HoodieTestTable.of(metaClient).createCommitMetadata(instantTime, 
WriteOperationType.INSERT, util.Arrays.asList("par1", "par2"), 10, false)
         val serializedMetadata = serializeCommitMetadata(metadata).get()
         instantBuffer.add(new DummyActiveAction(instant, serializedMetadata))
@@ -85,6 +87,15 @@ object LSMTimelineReadBenchmark extends HoodieBenchmarkBase {
       benchmark.addCase("read instants with commit metadata") { _ =>
         new HoodieArchivedTimeline(metaClient, startInstant)
       }
+      // for scala compatibility
+      val earliestStartTimeFunc: java.util.function.Function[String, String] = 
new java.util.function.Function[String, String] {
+        override def apply(s: String): String = {
+          (s.toLong - 1000) + ""
+        }
+      }
+      benchmark.addCase("read start time") { _ =>
+        new CompletionTimeQueryView(metaClient).getStartTimeSet(startTs + 1 + 
1000 + "", startTs + commitsNum + 1000 + "", earliestStartTimeFunc)
+      }
       benchmark.run()
       val totalSize = 
LSMTimeline.latestSnapshotManifest(metaClient).getFiles.asScala
         .map(f => f.getFileLen)

Reply via email to