This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a3eea2fdccd [HUDI-6725] Support efficient completion time queries on 
the timeline (#9565)
a3eea2fdccd is described below

commit a3eea2fdccd40a6439d6e8d72bf3ae53f5967893
Author: Danny Chan <[email protected]>
AuthorDate: Tue Sep 5 09:38:06 2023 +0800

    [HUDI-6725] Support efficient completion time queries on the timeline 
(#9565)
---
 .../client/timeline/CompletionTimeQueryView.java   | 149 +++++++++++++++++++++
 .../timeline/TestCompletionTimeQueryView.java      | 124 +++++++++++++++++
 .../table/timeline/HoodieArchivedTimeline.java     |  73 ++++++----
 3 files changed, 317 insertions(+), 29 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/CompletionTimeQueryView.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/CompletionTimeQueryView.java
new file mode 100644
index 00000000000..9b47c5a0286
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/CompletionTimeQueryView.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.timeline;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Query view for instant completion time.
+ */
+public class CompletionTimeQueryView implements AutoCloseable {
+  private final HoodieTableMetaClient metaClient;
+
+  /**
+   * Mapping from instant start time -> completion time.
+   * Should be thread-safe data structure.
+   */
+  private final Map<String, String> startToCompletionInstantTimeMap;
+
+  /**
+   * The start instant time to eagerly load from, by default load last N days 
of completed instants.
+   */
+  private final String startInstant;
+
+  /**
+   * The first instant on the active timeline, used for query optimization.
+   */
+  private final String firstInstantOnActiveTimeline;
+
+  /**
+   * The constructor.
+   *
+   * @param metaClient   The table meta client.
+   * @param startInstant The earliest instant time to eagerly load from, by 
default load last N days of completed instants.
+   */
+  public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String 
startInstant) {
+    this.metaClient = metaClient;
+    this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
+    this.startInstant = startInstant;
+    this.firstInstantOnActiveTimeline = 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse("");
+    load();
+  }
+
+  /**
+   * Queries the instant completion time with given start time.
+   *
+   * @param startTime The start time.
+   *
+   * @return The completion time if the instant finished or empty if it is 
still pending.
+   */
+  public Option<String> getCompletionTime(String startTime) {
+    String completionTime = 
this.startToCompletionInstantTimeMap.get(startTime);
+    if (completionTime != null) {
+      return Option.of(completionTime);
+    }
+    if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN, 
this.firstInstantOnActiveTimeline)) {
+      // the instant is still pending
+      return Option.empty();
+    }
+    // the 'startTime' should be out of the eager loading range, switch to a 
lazy loading.
+    // This operation is resource costly.
+    HoodieArchivedTimeline.loadInstants(metaClient,
+        new EqualsTimestampFilter(startTime),
+        HoodieArchivedTimeline.LoadMode.SLIM,
+        r -> true,
+        this::readCompletionTime);
+    return 
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
+  }
+
+  /**
+   * This is method to read instant completion time.
+   * This would also update 'startToCompletionInstantTimeMap' map with start 
time/completion time pairs.
+   * Only instants starts from 'startInstant' (inclusive) are considered.
+   */
+  private void load() {
+    // load active instants first.
+    this.metaClient.getActiveTimeline()
+        .filterCompletedInstants().getInstantsAsStream()
+        .forEach(instant -> setCompletionTime(instant.getTimestamp(), 
instant.getStateTransitionTime()));
+    // then load the archived instants.
+    HoodieArchivedTimeline.loadInstants(metaClient,
+        new HoodieArchivedTimeline.StartTsFilter(this.startInstant),
+        HoodieArchivedTimeline.LoadMode.SLIM,
+        r -> true,
+        this::readCompletionTime);
+  }
+
+  private void readCompletionTime(String instantTime, GenericRecord record) {
+    final String completionTime = 
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
+    setCompletionTime(instantTime, completionTime);
+  }
+
+  private void setCompletionTime(String instantTime, String completionTime) {
+    this.startToCompletionInstantTimeMap.putIfAbsent(instantTime, 
completionTime);
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.startToCompletionInstantTimeMap.clear();
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner class
+  // -------------------------------------------------------------------------
+
+  /**
+   * A time based filter with equality of specified timestamp.
+   */
+  public static class EqualsTimestampFilter extends 
HoodieArchivedTimeline.TimeRangeFilter {
+    private final String ts;
+
+    public EqualsTimestampFilter(String ts) {
+      super(ts, ts); // endTs is never used
+      this.ts = ts;
+    }
+
+    public boolean isInRange(String instantTime) {
+      return HoodieTimeline.compareTimestamps(instantTime, EQUALS, ts);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
new file mode 100644
index 00000000000..e0bd6c62b0e
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.timeline;
+
+import org.apache.hudi.DummyActiveAction;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link 
org.apache.hudi.client.timeline.CompletionTimeQueryView}.
+ */
+public class TestCompletionTimeQueryView {
+  @TempDir
+  File tempFile;
+
+  @Test
+  void testReadCompletionTime() throws Exception {
+    String tableName = "testTable";
+    String tablePath = tempFile.getAbsolutePath() + Path.SEPARATOR + tableName;
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(new 
Configuration(), tablePath, HoodieTableType.COPY_ON_WRITE, tableName);
+    prepareTimeline(tablePath, metaClient);
+    try (CompletionTimeQueryView view = new 
CompletionTimeQueryView(metaClient, String.format("%08d", 3))) {
+      // query completion time from LSM timeline
+      for (int i = 3; i < 7; i++) {
+        assertThat(view.getCompletionTime(String.format("%08d", 
i)).orElse(""), is(String.format("%08d", i + 1000)));
+      }
+      // query completion time from active timeline
+      for (int i = 7; i < 11; i++) {
+        assertTrue(view.getCompletionTime(String.format("%08d", 
i)).isPresent());
+      }
+      // lazy loading
+      for (int i = 1; i < 3; i++) {
+        assertThat(view.getCompletionTime(String.format("%08d", 
i)).orElse(""), is(String.format("%08d", i + 1000)));
+      }
+      // query with inflight start time
+      assertFalse(view.getCompletionTime(String.format("%08d", 
11)).isPresent());
+      // query with non-exist start time
+      assertFalse(view.getCompletionTime(String.format("%08d", 
12)).isPresent());
+    }
+  }
+
+  private void prepareTimeline(String tablePath, HoodieTableMetaClient 
metaClient) throws Exception {
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath(tablePath)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+        .withMarkersType("DIRECT")
+        .build();
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    List<ActiveAction> activeActions = new ArrayList<>();
+    for (int i = 1; i < 11; i++) {
+      String instantTime = String.format("%08d", i);
+      String completionTime = String.format("%08d", i + 1000);
+      HoodieCommitMetadata metadata = 
testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT, 
Arrays.asList("par1", "par2"), 10, false);
+      testTable.addCommit(instantTime, Option.of(metadata));
+      activeActions.add(new DummyActiveAction(new 
HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, 
completionTime), metadata.toJsonString().getBytes()));
+    }
+    testTable.addRequestedCommit(String.format("%08d", 11));
+    List<HoodieInstant> instants = new HoodieActiveTimeline(metaClient, 
false).getInstantsAsStream().sorted().collect(Collectors.toList());
+    LSMTimelineWriter writer = LSMTimelineWriter.getInstance(writeConfig, 
getMockHoodieTable(metaClient));
+    // archive [1,2], [3,4], [5,6] separately
+    writer.write(activeActions.subList(0, 2), Option.empty(), Option.empty());
+    writer.write(activeActions.subList(2, 4), Option.empty(), Option.empty());
+    writer.write(activeActions.subList(4, 6), Option.empty(), Option.empty());
+    // reconcile the active timeline
+    instants.subList(0, 3 * 6).forEach(instant -> 
HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), 
metaClient.getMetaPath(), instant));
+    
ValidationUtils.checkState(metaClient.reloadActiveTimeline().filterCompletedInstants().countInstants()
 == 4, "should archive 6 instants with 4 as active");
+  }
+
+  @SuppressWarnings("rawtypes")
+  private HoodieTable getMockHoodieTable(HoodieTableMetaClient metaClient) {
+    HoodieTable hoodieTable = mock(HoodieTable.class);
+    TaskContextSupplier taskContextSupplier = mock(TaskContextSupplier.class);
+    when(taskContextSupplier.getPartitionIdSupplier()).thenReturn(() -> 1);
+    when(hoodieTable.getTaskContextSupplier()).thenReturn(taskContextSupplier);
+    when(hoodieTable.getMetaClient()).thenReturn(metaClient);
+    return hoodieTable;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index fff8e357829..bdd5750684e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 /**
@@ -59,8 +60,8 @@ import java.util.function.Function;
  * <p>This class can be serialized and de-serialized and on de-serialization 
the FileSystem is re-initialized.
  */
 public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
-  private static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime";
-  private static final String COMPLETION_TIME_ARCHIVED_META_FIELD = 
"completionTime";
+  public static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime";
+  public static final String COMPLETION_TIME_ARCHIVED_META_FIELD = 
"completionTime";
   private static final String ACTION_ARCHIVED_META_FIELD = "action";
   private static final String METADATA_ARCHIVED_META_FIELD = "metadata";
   private static final String PLAN_ARCHIVED_META_FIELD = "plan";
@@ -149,8 +150,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     return new HoodieArchivedTimeline(metaClient);
   }
 
-  private HoodieInstant readCommit(GenericRecord record, LoadMode loadMode) {
-    final String instantTime = 
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+  private HoodieInstant readCommit(String instantTime, GenericRecord record, 
LoadMode loadMode) {
     final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
     final String completionTime = 
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
     loadInstantDetails(record, instantTime, loadMode);
@@ -200,38 +200,53 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
       @Nullable TimeRangeFilter filter,
       LoadMode loadMode,
       Function<GenericRecord, Boolean> commitsFilter) {
+    Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
+    loadInstants(metaClient, filter, loadMode, commitsFilter, (instantTime, 
avroRecord) -> instantsInRange.putIfAbsent(instantTime, readCommit(instantTime, 
avroRecord, loadMode)));
+    ArrayList<HoodieInstant> result = new 
ArrayList<>(instantsInRange.values());
+    Collections.sort(result);
+    return result;
+  }
+
+  /**
+   * Loads the instants from the timeline.
+   *
+   * @param metaClient     The meta client.
+   * @param filter         The time range filter where the target instant 
belongs to.
+   * @param loadMode       The load mode.
+   * @param commitsFilter  Filter of the instant type.
+   * @param recordConsumer Consumer of the instant record payload.
+   */
+  public static void loadInstants(
+      HoodieTableMetaClient metaClient,
+      @Nullable TimeRangeFilter filter,
+      LoadMode loadMode,
+      Function<GenericRecord, Boolean> commitsFilter,
+      BiConsumer<String, GenericRecord> recordConsumer) {
     try {
       // List all files
       List<String> fileNames = 
LSMTimeline.latestSnapshotManifest(metaClient).getFileNames();
 
-      Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
       Schema readSchema = LSMTimeline.getReadSchema(loadMode);
       fileNames.stream()
-              .filter(fileName -> filter == null || 
LSMTimeline.isFileInRange(filter, fileName))
-              .parallel().forEach(fileName -> {
-                // Read the archived file
-                try (HoodieAvroParquetReader reader = 
(HoodieAvroParquetReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
-                        .getFileReader(metaClient.getHadoopConf(), new 
Path(metaClient.getArchivePath(), fileName))) {
-                  try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), 
readSchema)) {
-                    while (iterator.hasNext()) {
-                      GenericRecord record = (GenericRecord) iterator.next();
-                      String instantTime = 
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
-                      if (!instantsInRange.containsKey(instantTime)
-                              && (filter == null || 
filter.isInRange(instantTime))
-                              && commitsFilter.apply(record)) {
-                        HoodieInstant instant = readCommit(record, loadMode);
-                        instantsInRange.put(instantTime, instant);
-                      }
-                    }
+          .filter(fileName -> filter == null || 
LSMTimeline.isFileInRange(filter, fileName))
+          .parallel().forEach(fileName -> {
+            // Read the archived file
+            try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+                .getFileReader(metaClient.getHadoopConf(), new 
Path(metaClient.getArchivePath(), fileName))) {
+              try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), 
readSchema)) {
+                while (iterator.hasNext()) {
+                  GenericRecord record = (GenericRecord) iterator.next();
+                  String instantTime = 
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+                  if ((filter == null || filter.isInRange(instantTime))
+                      && commitsFilter.apply(record)) {
+                    recordConsumer.accept(instantTime, record);
                   }
-                } catch (IOException ioException) {
-                  throw new HoodieIOException("Error open file reader for 
path: " + new Path(metaClient.getArchivePath(), fileName));
                 }
-              });
-
-      ArrayList<HoodieInstant> result = new 
ArrayList<>(instantsInRange.values());
-      Collections.sort(result);
-      return result;
+              }
+            } catch (IOException ioException) {
+              throw new HoodieIOException("Error open file reader for path: " 
+ new Path(metaClient.getArchivePath(), fileName));
+            }
+          });
     } catch (IOException e) {
       throw new HoodieIOException(
           "Could not load archived commit timeline from path " + 
metaClient.getArchivePath(), e);
@@ -289,7 +304,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
   /**
    * A time based filter with range [startTs, +&#8734).
    */
-  private static class StartTsFilter extends TimeRangeFilter {
+  public static class StartTsFilter extends TimeRangeFilter {
     private final String startTs;
 
     public StartTsFilter(String startTs) {

Reply via email to