This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a3eea2fdccd [HUDI-6725] Support efficient completion time queries on
the timeline (#9565)
a3eea2fdccd is described below
commit a3eea2fdccd40a6439d6e8d72bf3ae53f5967893
Author: Danny Chan <[email protected]>
AuthorDate: Tue Sep 5 09:38:06 2023 +0800
[HUDI-6725] Support efficient completion time queries on the timeline
(#9565)
---
.../client/timeline/CompletionTimeQueryView.java | 149 +++++++++++++++++++++
.../timeline/TestCompletionTimeQueryView.java | 124 +++++++++++++++++
.../table/timeline/HoodieArchivedTimeline.java | 73 ++++++----
3 files changed, 317 insertions(+), 29 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/CompletionTimeQueryView.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/CompletionTimeQueryView.java
new file mode 100644
index 00000000000..9b47c5a0286
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/CompletionTimeQueryView.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.timeline;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Query view for instant completion time.
+ */
+public class CompletionTimeQueryView implements AutoCloseable {
+ private final HoodieTableMetaClient metaClient;
+
+ /**
+ * Mapping from instant start time -> completion time.
+ * Should be thread-safe data structure.
+ */
+ private final Map<String, String> startToCompletionInstantTimeMap;
+
+ /**
+ * The start instant time to eagerly load from, by default load last N days
of completed instants.
+ */
+ private final String startInstant;
+
+ /**
+ * The first instant on the active timeline, used for query optimization.
+ */
+ private final String firstInstantOnActiveTimeline;
+
+ /**
+ * The constructor.
+ *
+ * @param metaClient The table meta client.
+ * @param startInstant The earliest instant time to eagerly load from, by
default load last N days of completed instants.
+ */
+ public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String
startInstant) {
+ this.metaClient = metaClient;
+ this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
+ this.startInstant = startInstant;
+ this.firstInstantOnActiveTimeline =
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse("");
+ load();
+ }
+
+ /**
+ * Queries the instant completion time with given start time.
+ *
+ * @param startTime The start time.
+ *
+ * @return The completion time if the instant finished or empty if it is
still pending.
+ */
+ public Option<String> getCompletionTime(String startTime) {
+ String completionTime =
this.startToCompletionInstantTimeMap.get(startTime);
+ if (completionTime != null) {
+ return Option.of(completionTime);
+ }
+ if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN,
this.firstInstantOnActiveTimeline)) {
+ // the instant is still pending
+ return Option.empty();
+ }
+ // the 'startTime' should be out of the eager loading range, switch to a
lazy loading.
+ // This operation is resource costly.
+ HoodieArchivedTimeline.loadInstants(metaClient,
+ new EqualsTimestampFilter(startTime),
+ HoodieArchivedTimeline.LoadMode.SLIM,
+ r -> true,
+ this::readCompletionTime);
+ return
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
+ }
+
+ /**
+ * This is method to read instant completion time.
+ * This would also update 'startToCompletionInstantTimeMap' map with start
time/completion time pairs.
+ * Only instants starts from 'startInstant' (inclusive) are considered.
+ */
+ private void load() {
+ // load active instants first.
+ this.metaClient.getActiveTimeline()
+ .filterCompletedInstants().getInstantsAsStream()
+ .forEach(instant -> setCompletionTime(instant.getTimestamp(),
instant.getStateTransitionTime()));
+ // then load the archived instants.
+ HoodieArchivedTimeline.loadInstants(metaClient,
+ new HoodieArchivedTimeline.StartTsFilter(this.startInstant),
+ HoodieArchivedTimeline.LoadMode.SLIM,
+ r -> true,
+ this::readCompletionTime);
+ }
+
+ private void readCompletionTime(String instantTime, GenericRecord record) {
+ final String completionTime =
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
+ setCompletionTime(instantTime, completionTime);
+ }
+
+ private void setCompletionTime(String instantTime, String completionTime) {
+ this.startToCompletionInstantTimeMap.putIfAbsent(instantTime,
completionTime);
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.startToCompletionInstantTimeMap.clear();
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner class
+ // -------------------------------------------------------------------------
+
+ /**
+ * A time based filter with equality of specified timestamp.
+ */
+ public static class EqualsTimestampFilter extends
HoodieArchivedTimeline.TimeRangeFilter {
+ private final String ts;
+
+ public EqualsTimestampFilter(String ts) {
+ super(ts, ts); // endTs is never used
+ this.ts = ts;
+ }
+
+ public boolean isInRange(String instantTime) {
+ return HoodieTimeline.compareTimestamps(instantTime, EQUALS, ts);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
new file mode 100644
index 00000000000..e0bd6c62b0e
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.timeline;
+
+import org.apache.hudi.DummyActiveAction;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link
org.apache.hudi.client.timeline.CompletionTimeQueryView}.
+ */
+public class TestCompletionTimeQueryView {
+ @TempDir
+ File tempFile;
+
+ @Test
+ void testReadCompletionTime() throws Exception {
+ String tableName = "testTable";
+ String tablePath = tempFile.getAbsolutePath() + Path.SEPARATOR + tableName;
+ HoodieTableMetaClient metaClient = HoodieTestUtils.init(new
Configuration(), tablePath, HoodieTableType.COPY_ON_WRITE, tableName);
+ prepareTimeline(tablePath, metaClient);
+ try (CompletionTimeQueryView view = new
CompletionTimeQueryView(metaClient, String.format("%08d", 3))) {
+ // query completion time from LSM timeline
+ for (int i = 3; i < 7; i++) {
+ assertThat(view.getCompletionTime(String.format("%08d",
i)).orElse(""), is(String.format("%08d", i + 1000)));
+ }
+ // query completion time from active timeline
+ for (int i = 7; i < 11; i++) {
+ assertTrue(view.getCompletionTime(String.format("%08d",
i)).isPresent());
+ }
+ // lazy loading
+ for (int i = 1; i < 3; i++) {
+ assertThat(view.getCompletionTime(String.format("%08d",
i)).orElse(""), is(String.format("%08d", i + 1000)));
+ }
+ // query with inflight start time
+ assertFalse(view.getCompletionTime(String.format("%08d",
11)).isPresent());
+ // query with non-exist start time
+ assertFalse(view.getCompletionTime(String.format("%08d",
12)).isPresent());
+ }
+ }
+
+ private void prepareTimeline(String tablePath, HoodieTableMetaClient
metaClient) throws Exception {
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(tablePath)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+ .withMarkersType("DIRECT")
+ .build();
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ List<ActiveAction> activeActions = new ArrayList<>();
+ for (int i = 1; i < 11; i++) {
+ String instantTime = String.format("%08d", i);
+ String completionTime = String.format("%08d", i + 1000);
+ HoodieCommitMetadata metadata =
testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT,
Arrays.asList("par1", "par2"), 10, false);
+ testTable.addCommit(instantTime, Option.of(metadata));
+ activeActions.add(new DummyActiveAction(new
HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime,
completionTime), metadata.toJsonString().getBytes()));
+ }
+ testTable.addRequestedCommit(String.format("%08d", 11));
+ List<HoodieInstant> instants = new HoodieActiveTimeline(metaClient,
false).getInstantsAsStream().sorted().collect(Collectors.toList());
+ LSMTimelineWriter writer = LSMTimelineWriter.getInstance(writeConfig,
getMockHoodieTable(metaClient));
+ // archive [1,2], [3,4], [5,6] separately
+ writer.write(activeActions.subList(0, 2), Option.empty(), Option.empty());
+ writer.write(activeActions.subList(2, 4), Option.empty(), Option.empty());
+ writer.write(activeActions.subList(4, 6), Option.empty(), Option.empty());
+ // reconcile the active timeline
+ instants.subList(0, 3 * 6).forEach(instant ->
HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(),
metaClient.getMetaPath(), instant));
+
ValidationUtils.checkState(metaClient.reloadActiveTimeline().filterCompletedInstants().countInstants()
== 4, "should archive 6 instants with 4 as active");
+ }
+
+ @SuppressWarnings("rawtypes")
+ private HoodieTable getMockHoodieTable(HoodieTableMetaClient metaClient) {
+ HoodieTable hoodieTable = mock(HoodieTable.class);
+ TaskContextSupplier taskContextSupplier = mock(TaskContextSupplier.class);
+ when(taskContextSupplier.getPartitionIdSupplier()).thenReturn(() -> 1);
+ when(hoodieTable.getTaskContextSupplier()).thenReturn(taskContextSupplier);
+ when(hoodieTable.getMetaClient()).thenReturn(metaClient);
+ return hoodieTable;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index fff8e357829..bdd5750684e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
import java.util.function.Function;
/**
@@ -59,8 +60,8 @@ import java.util.function.Function;
* <p>This class can be serialized and de-serialized and on de-serialization
the FileSystem is re-initialized.
*/
public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
- private static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime";
- private static final String COMPLETION_TIME_ARCHIVED_META_FIELD =
"completionTime";
+ public static final String INSTANT_TIME_ARCHIVED_META_FIELD = "instantTime";
+ public static final String COMPLETION_TIME_ARCHIVED_META_FIELD =
"completionTime";
private static final String ACTION_ARCHIVED_META_FIELD = "action";
private static final String METADATA_ARCHIVED_META_FIELD = "metadata";
private static final String PLAN_ARCHIVED_META_FIELD = "plan";
@@ -149,8 +150,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
return new HoodieArchivedTimeline(metaClient);
}
- private HoodieInstant readCommit(GenericRecord record, LoadMode loadMode) {
- final String instantTime =
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+ private HoodieInstant readCommit(String instantTime, GenericRecord record,
LoadMode loadMode) {
final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
final String completionTime =
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
loadInstantDetails(record, instantTime, loadMode);
@@ -200,38 +200,53 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
@Nullable TimeRangeFilter filter,
LoadMode loadMode,
Function<GenericRecord, Boolean> commitsFilter) {
+ Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
+ loadInstants(metaClient, filter, loadMode, commitsFilter, (instantTime,
avroRecord) -> instantsInRange.putIfAbsent(instantTime, readCommit(instantTime,
avroRecord, loadMode)));
+ ArrayList<HoodieInstant> result = new
ArrayList<>(instantsInRange.values());
+ Collections.sort(result);
+ return result;
+ }
+
+ /**
+ * Loads the instants from the timeline.
+ *
+ * @param metaClient The meta client.
+ * @param filter The time range filter where the target instant
belongs to.
+ * @param loadMode The load mode.
+ * @param commitsFilter Filter of the instant type.
+ * @param recordConsumer Consumer of the instant record payload.
+ */
+ public static void loadInstants(
+ HoodieTableMetaClient metaClient,
+ @Nullable TimeRangeFilter filter,
+ LoadMode loadMode,
+ Function<GenericRecord, Boolean> commitsFilter,
+ BiConsumer<String, GenericRecord> recordConsumer) {
try {
// List all files
List<String> fileNames =
LSMTimeline.latestSnapshotManifest(metaClient).getFileNames();
- Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
Schema readSchema = LSMTimeline.getReadSchema(loadMode);
fileNames.stream()
- .filter(fileName -> filter == null ||
LSMTimeline.isFileInRange(filter, fileName))
- .parallel().forEach(fileName -> {
- // Read the archived file
- try (HoodieAvroParquetReader reader =
(HoodieAvroParquetReader)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
- .getFileReader(metaClient.getHadoopConf(), new
Path(metaClient.getArchivePath(), fileName))) {
- try (ClosableIterator<IndexedRecord> iterator =
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(),
readSchema)) {
- while (iterator.hasNext()) {
- GenericRecord record = (GenericRecord) iterator.next();
- String instantTime =
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
- if (!instantsInRange.containsKey(instantTime)
- && (filter == null ||
filter.isInRange(instantTime))
- && commitsFilter.apply(record)) {
- HoodieInstant instant = readCommit(record, loadMode);
- instantsInRange.put(instantTime, instant);
- }
- }
+ .filter(fileName -> filter == null ||
LSMTimeline.isFileInRange(filter, fileName))
+ .parallel().forEach(fileName -> {
+ // Read the archived file
+ try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(metaClient.getHadoopConf(), new
Path(metaClient.getArchivePath(), fileName))) {
+ try (ClosableIterator<IndexedRecord> iterator =
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(),
readSchema)) {
+ while (iterator.hasNext()) {
+ GenericRecord record = (GenericRecord) iterator.next();
+ String instantTime =
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
+ if ((filter == null || filter.isInRange(instantTime))
+ && commitsFilter.apply(record)) {
+ recordConsumer.accept(instantTime, record);
}
- } catch (IOException ioException) {
- throw new HoodieIOException("Error open file reader for
path: " + new Path(metaClient.getArchivePath(), fileName));
}
- });
-
- ArrayList<HoodieInstant> result = new
ArrayList<>(instantsInRange.values());
- Collections.sort(result);
- return result;
+ }
+ } catch (IOException ioException) {
+ throw new HoodieIOException("Error open file reader for path: "
+ new Path(metaClient.getArchivePath(), fileName));
+ }
+ });
} catch (IOException e) {
throw new HoodieIOException(
"Could not load archived commit timeline from path " +
metaClient.getArchivePath(), e);
@@ -289,7 +304,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
/**
* A time based filter with range [startTs, +∞).
*/
- private static class StartTsFilter extends TimeRangeFilter {
+ public static class StartTsFilter extends TimeRangeFilter {
private final String startTs;
public StartTsFilter(String startTs) {