This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 58bc859b173 [HUDI-7476] Incremental loading for archived timeline
(#10807)
58bc859b173 is described below
commit 58bc859b173a3648ff5f7f2042aaadf8281cac2c
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 8 17:53:04 2024 +0800
[HUDI-7476] Incremental loading for archived timeline (#10807)
---
.../hudi/client/timeline/LSMTimelineWriter.java | 17 +++-
.../table/timeline/TestHoodieArchivedTimeline.java | 105 +++++++++++++++++++++
.../table/timeline/HoodieArchivedTimeline.java | 29 ++++++
.../table/timeline/HoodieDefaultTimeline.java | 69 ++++++++++++--
4 files changed, 208 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
index 4f487410a8c..08500f06c9a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
@@ -20,6 +20,7 @@
package org.apache.hudi.client.timeline;
import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -72,21 +73,29 @@ public class LSMTimelineWriter {
public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
private final HoodieWriteConfig config;
- private final HoodieTable<?, ?, ?, ?> table;
+ private final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetaClient metaClient;
private HoodieWriteConfig writeConfig;
private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?>
table) {
+ this(config, table.getTaskContextSupplier(), table.getMetaClient());
+ }
+
+ private LSMTimelineWriter(HoodieWriteConfig config, TaskContextSupplier
taskContextSupplier, HoodieTableMetaClient metaClient) {
this.config = config;
- this.table = table;
- this.metaClient = table.getMetaClient();
+ this.taskContextSupplier = taskContextSupplier;
+ this.metaClient = metaClient;
}
public static LSMTimelineWriter getInstance(HoodieWriteConfig config,
HoodieTable<?, ?, ?, ?> table) {
return new LSMTimelineWriter(config, table);
}
+ public static LSMTimelineWriter getInstance(HoodieWriteConfig config,
TaskContextSupplier taskContextSupplier, HoodieTableMetaClient metaClient) {
+ return new LSMTimelineWriter(config, taskContextSupplier, metaClient);
+ }
+
/**
* Writes the list of active actions into the timeline.
*
@@ -366,7 +375,7 @@ public class LSMTimelineWriter {
private HoodieFileWriter openWriter(Path filePath) {
try {
return HoodieFileWriterFactory.getFileWriter("", filePath,
metaClient.getHadoopConf(), getOrCreateWriterConfig(),
- HoodieLSMTimelineInstant.getClassSchema(),
table.getTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO);
+ HoodieLSMTimelineInstant.getClassSchema(), taskContextSupplier,
HoodieRecord.HoodieRecordType.AVRO);
} catch (IOException e) {
throw new HoodieException("Unable to initialize archiving writer", e);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
new file mode 100644
index 00000000000..664385f9ae0
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.DummyActiveAction;
+import org.apache.hudi.client.timeline.LSMTimelineWriter;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link HoodieArchivedTimeline}.
+ */
+public class TestHoodieArchivedTimeline extends HoodieCommonTestHarness {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initMetaClient();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanMetaClient();
+ }
+
+ @Test
+ public void testLoadingInstantsIncrementally() throws Exception {
+ writeArchivedTimeline(10, 10000000);
+ // now we got 500 instants spread in 5 parquets.
+ HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline("10000043");
+
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""),
is("10000043"));
+
assertThat(archivedTimeline.lastInstant().map(HoodieInstant::getTimestamp).orElse(""),
is("10000050"));
+ // load incrementally
+ archivedTimeline.reload("10000034");
+
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""),
is("10000034"));
+ archivedTimeline.reload("10000011");
+
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""),
is("10000011"));
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void writeArchivedTimeline(int batchSize, long startTs) throws
Exception {
+ HoodieTestTable testTable = HoodieTestTable.of(this.metaClient);
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(this.metaClient.getBasePathV2().toString())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+ .withMarkersType("DIRECT")
+ .build();
+ HoodieEngineContext engineContext = new HoodieLocalEngineContext(new
Configuration());
+ LSMTimelineWriter writer = LSMTimelineWriter.getInstance(writeConfig, new
LocalTaskContextSupplier(), metaClient);
+ List<ActiveAction> instantBuffer = new ArrayList<>();
+ for (int i = 1; i <= 50; i++) {
+ long instantTimeTs = startTs + i;
+ String instantTime = String.valueOf(instantTimeTs);
+ String completionTime = String.valueOf(instantTimeTs + 10);
+ HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
"commit", instantTime, completionTime);
+ HoodieCommitMetadata metadata =
testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT,
Arrays.asList("par1", "par2"), 10, false);
+ byte[] serializedMetadata =
TimelineMetadataUtils.serializeCommitMetadata(metadata).get();
+ instantBuffer.add(new DummyActiveAction(instant, serializedMetadata));
+ if (i % batchSize == 0) {
+ // archive 10 instants each time
+ writer.write(instantBuffer,
org.apache.hudi.common.util.Option.empty(),
org.apache.hudi.common.util.Option.empty());
+ writer.compactAndClean(engineContext);
+ instantBuffer.clear();
+ }
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index c9aba9ebc0d..d6167ea61e7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -72,6 +72,13 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieArchivedTimeline.class);
+ /**
+ * Used for loading the archived timeline incrementally, the earliest loaded
instant time get memorized
+ * each time the timeline is loaded. The instant time is then used as the
end boundary
+ * of the next loading.
+ */
+ private String cursorInstant;
+
/**
* Loads all the archived instants.
* Note that there is no lazy loading, so this may not work if the archived
timeline range is really long.
@@ -80,6 +87,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
setInstants(this.loadInstants());
+ this.cursorInstant =
firstInstant().map(HoodieInstant::getTimestamp).orElse(null);
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails;
@@ -92,6 +100,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String
startTs) {
this.metaClient = metaClient;
setInstants(loadInstants(new StartTsFilter(startTs), LoadMode.METADATA));
+ this.cursorInstant = startTs;
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable)
this::getInstantDetails;
@@ -152,6 +161,26 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
return new HoodieArchivedTimeline(metaClient);
}
+ /**
+ * Reloads the archived timeline incrementally with given beginning
timestamp {@code startTs}.
+ * This method is not thread safe.
+ *
+ * <p>IMPORTANT: this is for multiple loading of one static snapshot of the
timeline, if there is new instants got archived,
+ * use {@link #reload()} instead.
+ */
+ public HoodieArchivedTimeline reload(String startTs) {
+ if (this.cursorInstant != null) {
+ if (HoodieTimeline.compareTimestamps(startTs, LESSER_THAN,
this.cursorInstant)) {
+ appendInstants(loadInstants(new ClosedOpenTimeRangeFilter(startTs,
this.cursorInstant), LoadMode.METADATA));
+ this.cursorInstant = startTs;
+ }
+ return this;
+ } else {
+ // a null cursor instant indicates an empty timeline
+ return new HoodieArchivedTimeline(metaClient, startTs);
+ }
+ }
+
private HoodieInstant readCommit(String instantTime, GenericRecord record,
Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer) {
final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
final String completionTime =
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index d08194f266c..7af56785906 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@@ -75,15 +77,23 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
public void setInstants(List<HoodieInstant> instants) {
this.instants = instants;
- final MessageDigest md;
- try {
- md = MessageDigest.getInstance(HASHING_ALGORITHM);
- this.instants.forEach(i -> md
- .update(getUTF8Bytes(StringUtils.joinUsingDelim("_",
i.getTimestamp(), i.getAction(), i.getState().name()))));
- } catch (NoSuchAlgorithmException nse) {
- throw new HoodieException(nse);
+ this.timelineHash = computeTimelineHash(this.instants);
+ clearState();
+ }
+
+ public void appendInstants(List<HoodieInstant> newInstants) {
+ if (newInstants.isEmpty()) {
+ // the new instants is empty, nothing to do.
+ return;
+ }
+ if (this.instants.isEmpty()) {
+ // the existing instants is empty, set up the new ones directly.
+ setInstants(newInstants);
+ return;
}
- this.timelineHash = StringUtils.toHexString(md.digest());
+ this.instants = mergeInstants(newInstants, this.instants);
+ this.timelineHash = computeTimelineHash(this.instants);
+ clearState();
}
/**
@@ -567,6 +577,11 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
return Option.fromJavaOptional(instants.stream().findFirst());
}
+ private void clearState() {
+ instantTimeSet = null;
+ firstNonSavepointCommit = null;
+ }
+
/**
* Merge this timeline with the given timeline.
*/
@@ -581,4 +596,42 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
};
return new HoodieDefaultTimeline(instantStream, details);
}
+
+ /**
+ * Computes the timeline hash and returns.
+ */
+ private String computeTimelineHash(List<HoodieInstant> instants) {
+ final MessageDigest md;
+ try {
+ md = MessageDigest.getInstance(HASHING_ALGORITHM);
+ instants.forEach(i -> md
+ .update(getUTF8Bytes(StringUtils.joinUsingDelim("_",
i.getTimestamp(), i.getAction(), i.getState().name()))));
+ } catch (NoSuchAlgorithmException nse) {
+ throw new HoodieException(nse);
+ }
+ return StringUtils.toHexString(md.digest());
+ }
+
+ /**
+ * Merges the given instant list into one and keep the sequence.
+ */
+ private static List<HoodieInstant> mergeInstants(List<HoodieInstant>
instants1, List<HoodieInstant> instants2) {
+ ValidationUtils.checkArgument(!instants1.isEmpty() &&
!instants2.isEmpty(), "The instants to merge can not be empty");
+ // some optimizations are based on the assumption all the instant lists
are already sorted.
+ // skip when one list contains all the instants of the other one.
+ final List<HoodieInstant> merged;
+ if (HoodieTimeline.compareTimestamps(instants1.get(instants1.size() -
1).getTimestamp(), LESSER_THAN_OR_EQUALS, instants2.get(0).getTimestamp())) {
+ merged = new ArrayList<>(instants1);
+ merged.addAll(instants2);
+ } else if (HoodieTimeline.compareTimestamps(instants2.get(instants2.size()
- 1).getTimestamp(), LESSER_THAN_OR_EQUALS, instants1.get(0).getTimestamp())) {
+ merged = new ArrayList<>(instants2);
+ merged.addAll(instants1);
+ } else {
+ merged = new ArrayList<>(instants1);
+ merged.addAll(instants2);
+ // sort the instants explicitly
+ Collections.sort(merged);
+ }
+ return merged;
+ }
}