This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 225b03c5af8 [HUDI-7184] Add IncrementalQueryAnalyzer for completion
time based in… (#10255)
225b03c5af8 is described below
commit 225b03c5af82a50c2f3e5760f3e552cd2594fc0b
Author: Danny Chan <[email protected]>
AuthorDate: Tue Feb 13 05:36:34 2024 +0800
[HUDI-7184] Add IncrementalQueryAnalyzer for completion time based in…
(#10255)
* [HUDI-7184] Add IncrementalQueryAnalyzer for completion time based
incremental queries
* CR Feedback
* More CR Feedback
---------
Co-authored-by: vinoth chandar <[email protected]>
---
.../hudi/table/action/compact/CompactHelpers.java | 2 +-
.../timeline/TestCompletionTimeQueryView.java | 2 +-
.../apache/hudi/common/table/log/InstantRange.java | 157 +++++---
.../table/read/IncrementalQueryAnalyzer.java | 435 +++++++++++++++++++++
.../table/timeline/CompletionTimeQueryView.java | 174 ++++++---
.../table/timeline/HoodieInstantTimeGenerator.java | 4 +
.../apache/hudi/common/util/ClusteringUtils.java | 15 +
.../metadata/HoodieMetadataLogRecordReader.java | 2 +-
.../apache/hudi/configuration/OptionsResolver.java | 14 +-
.../apache/hudi/source/IncrementalInputSplits.java | 326 +++------------
.../hudi/source/StreamReadMonitoringFunction.java | 2 +-
.../java/org/apache/hudi/util/ClusteringUtil.java | 20 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 3 +-
.../hudi/source/TestIncrementalInputSplits.java | 99 +++--
.../source/TestStreamReadMonitoringFunction.java | 4 +-
.../apache/hudi/table/format/TestInputFormat.java | 67 ++--
.../test/java/org/apache/hudi/utils/TestUtils.java | 6 +-
.../java/org/apache/hudi/common/util/Option.java | 4 +
.../scala/org/apache/hudi/cdc/CDCRelation.scala | 2 +-
.../benchmark/LSMTimelineReadBenchmark.scala | 2 +-
20 files changed, 860 insertions(+), 480 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index 78ea56fd9ad..c4ef6444d2d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -117,7 +117,7 @@ public class CompactHelpers<T, I, K, O> {
.build();
Set<String> validInstants =
HoodieTableMetadataUtil.getValidInstantTimestamps(dataMetaClient,
metadataMetaClient);
return InstantRange.builder()
- .rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
+ .rangeType(InstantRange.RangeType.EXACT_MATCH)
.explicitInstants(validInstants).build();
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
index 9dbc9c8656f..3122473565e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
@@ -114,7 +114,7 @@ public class TestCompletionTimeQueryView {
}
private String getInstantTimeSetFormattedString(CompletionTimeQueryView
view, int completionTime1, int completionTime2) {
- return view.getStartTimeSet(String.format("%08d", completionTime1),
String.format("%08d", completionTime2), s -> String.format("%08d",
Integer.parseInt(s) - 1000))
+ return view.getStartTimes(String.format("%08d", completionTime1),
String.format("%08d", completionTime2), s -> String.format("%08d",
Integer.parseInt(s) - 1000))
.stream().sorted().collect(Collectors.joining(","));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
index 96c7b0c0ddf..6dfc09dbf4f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
@@ -19,25 +19,29 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
/**
- * A instant commits range used for incremental reader filtering.
+ * An instant range used for incremental reader filtering.
*/
public abstract class InstantRange implements Serializable {
private static final long serialVersionUID = 1L;
- protected final String startInstant;
- protected final String endInstant;
+ protected final Option<String> startInstant;
+ protected final Option<String> endInstant;
public InstantRange(String startInstant, String endInstant) {
- this.startInstant = startInstant;
- this.endInstant = endInstant;
+ this.startInstant = Option.ofNullable(startInstant);
+ this.endInstant = Option.ofNullable(endInstant);
}
/**
@@ -47,11 +51,11 @@ public abstract class InstantRange implements Serializable {
return new Builder();
}
- public String getStartInstant() {
+ public Option<String> getStartInstant() {
return startInstant;
}
- public String getEndInstant() {
+ public Option<String> getEndInstant() {
return endInstant;
}
@@ -60,8 +64,8 @@ public abstract class InstantRange implements Serializable {
@Override
public String toString() {
return "InstantRange{"
- + "startInstant='" + startInstant == null ? "null" : startInstant +
'\''
- + ", endInstant='" + endInstant == null ? "null" : endInstant + '\''
+ + "startInstant='" + (startInstant.isEmpty() ? "-INF" :
startInstant.get()) + '\''
+ + ", endInstant='" + (endInstant.isEmpty() ? "+INF" :
endInstant.get()) + '\''
+ ", rangeType='" + this.getClass().getSimpleName() + '\''
+ '}';
}
@@ -73,89 +77,98 @@ public abstract class InstantRange implements Serializable {
/**
* Represents a range type.
*/
- public static enum RangeType {
- OPEN_CLOSE, CLOSE_CLOSE, EXPLICIT_MATCH
+ public enum RangeType {
+ /** Start instant is not included (>) and end instant is included (<=). */
+ OPEN_CLOSED,
+ /** Both start and end instants are included (>=, <=). */
+ CLOSED_CLOSED,
+ /** Exact match of instants. */
+ EXACT_MATCH,
+ /** Composition of multiple ranges. */
+ COMPOSITION
}
- private static class OpenCloseRange extends InstantRange {
+ private static class OpenClosedRange extends InstantRange {
- public OpenCloseRange(String startInstant, String endInstant) {
+ public OpenClosedRange(String startInstant, String endInstant) {
super(Objects.requireNonNull(startInstant), endInstant);
}
@Override
public boolean isInRange(String instant) {
- // No need to do comparison:
- // HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
- // because the logic is ensured by the log scanner
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN, startInstant);
+ boolean validAgainstStart = HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN, startInstant.get());
+ // if there is an end instant, check against it, otherwise assume +INF
and its always valid.
+ boolean validAgainstEnd = endInstant
+ .map(e -> HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+ .orElse(true);
+ return validAgainstStart && validAgainstEnd;
}
}
- private static class OpenCloseRangeNullableBoundary extends InstantRange {
+ private static class OpenClosedRangeNullableBoundary extends InstantRange {
- public OpenCloseRangeNullableBoundary(String startInstant, String
endInstant) {
+ public OpenClosedRangeNullableBoundary(String startInstant, String
endInstant) {
super(startInstant, endInstant);
- ValidationUtils.checkArgument(startInstant != null || endInstant != null,
- "Start and end instants can not both be null");
+ ValidationUtils.checkArgument(!startInstant.isEmpty() ||
!endInstant.isEmpty(),
+ "At least one of start and end instants should be specified.");
}
@Override
public boolean isInRange(String instant) {
- if (startInstant == null) {
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
- } else if (endInstant == null) {
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN, startInstant);
- } else {
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN, startInstant)
- && HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
- }
+ boolean validAgainstStart = startInstant
+ .map(s -> HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN, s))
+ .orElse(true);
+ boolean validAgainstEnd = endInstant
+ .map(e -> HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+ .orElse(true);
+
+ return validAgainstStart && validAgainstEnd;
}
}
- private static class CloseCloseRange extends InstantRange {
+ private static class ClosedClosedRange extends InstantRange {
- public CloseCloseRange(String startInstant, String endInstant) {
+ public ClosedClosedRange(String startInstant, String endInstant) {
super(Objects.requireNonNull(startInstant), endInstant);
}
@Override
public boolean isInRange(String instant) {
- // No need to do comparison:
- // HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
- // because the logic is ensured by the log scanner
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
+ boolean validAgainstStart = HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant.get());
+ boolean validAgainstEnd = endInstant
+ .map(e -> HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+ .orElse(true);
+ return validAgainstStart && validAgainstEnd;
}
}
- private static class CloseCloseRangeNullableBoundary extends InstantRange {
+ private static class ClosedClosedRangeNullableBoundary extends InstantRange {
- public CloseCloseRangeNullableBoundary(String startInstant, String
endInstant) {
+ public ClosedClosedRangeNullableBoundary(String startInstant, String
endInstant) {
super(startInstant, endInstant);
- ValidationUtils.checkArgument(startInstant != null || endInstant != null,
- "Start and end instants can not both be null");
+ ValidationUtils.checkArgument(!startInstant.isEmpty() ||
!endInstant.isEmpty(),
+ "At least one of start and end instants should be specified.");
}
@Override
public boolean isInRange(String instant) {
- if (startInstant == null) {
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
- } else if (endInstant == null) {
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
- } else {
- return HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant)
- && HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
- }
+ boolean validAgainstStart = startInstant
+ .map(s -> HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, s))
+ .orElse(true);
+ boolean validAgainstEnd = endInstant
+ .map(e -> HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+ .orElse(true);
+ return validAgainstStart && validAgainstEnd;
}
}
/**
* Class to assist in checking if an instant is part of a set of instants.
*/
- private static class ExplicitMatchRange extends InstantRange {
+ private static class ExactMatchRange extends InstantRange {
Set<String> instants;
- public ExplicitMatchRange(Set<String> instants) {
+ public ExactMatchRange(Set<String> instants) {
super(Collections.min(instants), Collections.max(instants));
this.instants = instants;
}
@@ -166,6 +179,28 @@ public abstract class InstantRange implements Serializable
{
}
}
+ /**
+ * Composition of multiple instant ranges in disjunctive form.
+ */
+ private static class CompositionRange extends InstantRange {
+ List<InstantRange> instantRanges;
+
+ public CompositionRange(List<InstantRange> instantRanges) {
+ super(null, null);
+ this.instantRanges = Objects.requireNonNull(instantRanges, "Instant
ranges should not be null");
+ }
+
+ @Override
+ public boolean isInRange(String instant) {
+ for (InstantRange range : instantRanges) {
+ if (range.isInRange(instant)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -179,6 +214,7 @@ public abstract class InstantRange implements Serializable {
private RangeType rangeType;
private boolean nullableBoundary = false;
private Set<String> explicitInstants;
+ private List<InstantRange> instantRanges;
private Builder() {
}
@@ -208,19 +244,26 @@ public abstract class InstantRange implements
Serializable {
return this;
}
+ public Builder instantRanges(InstantRange... instantRanges) {
+ this.instantRanges =
Arrays.stream(instantRanges).collect(Collectors.toList());
+ return this;
+ }
+
public InstantRange build() {
ValidationUtils.checkState(this.rangeType != null, "Range type is
required");
switch (rangeType) {
- case OPEN_CLOSE:
+ case OPEN_CLOSED:
return nullableBoundary
- ? new OpenCloseRangeNullableBoundary(startInstant, endInstant)
- : new OpenCloseRange(startInstant, endInstant);
- case CLOSE_CLOSE:
+ ? new OpenClosedRangeNullableBoundary(startInstant, endInstant)
+ : new OpenClosedRange(startInstant, endInstant);
+ case CLOSED_CLOSED:
return nullableBoundary
- ? new CloseCloseRangeNullableBoundary(startInstant, endInstant)
- : new CloseCloseRange(startInstant, endInstant);
- case EXPLICIT_MATCH:
- return new ExplicitMatchRange(this.explicitInstants);
+ ? new ClosedClosedRangeNullableBoundary(startInstant, endInstant)
+ : new ClosedClosedRange(startInstant, endInstant);
+ case EXACT_MATCH:
+ return new ExactMatchRange(this.explicitInstants);
+ case COMPOSITION:
+ return new CompositionRange(this.instantRanges);
default:
throw new AssertionError();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
new file mode 100644
index 00000000000..03596354bb1
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries on the timeline, to filter instants based
on specified ranges.
+ *
+ * <p>The analyzer is supplied the following information:
+ * <ul>
+ * <li>The archived instants;</li>
+ * <li>The active instants;</li>
+ * <li>The instant filtering predicate, e.g the instant range with a
"startTime" and "endTime"</li>
+ * <li>Whether the query starts from the "earliest" available instant;</li>
+ * <li>Whether the query ends to the "latest" available instant;</li>
+ * <li>The max completion time used for fs view file slice version
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ * <tr>
+ * <th>Query Range</th>
+ * <th>File selection criteria</th>
+ * <th>Instant filtering predicate applied to selected files</th>
+ * </tr>
+ * <tr>
+ * <td>[earliest, +INF]</td>
+ * <td>The latest snapshot files from table metadata</td>
+ * <td>_</td>
+ * </tr>
+ * <tr>
+ * <td>[earliest, endTime]</td>
+ * <td>The latest snapshot files from table metadata</td>
+ * <td>'_hoodie_commit_time' in setA, setA contains the begin instant
times for actions completed before or on 'endTime'</td>
+ * </tr>
+ * <tr>
+ * <td>[-INF, +INF]</td>
+ * <td>The latest completed instant metadata</td>
+ * <td>'_hoodie_commit_time' = i_n, i_n is the latest completed
instant</td>
+ * </tr>
+ * <tr>
+ * <td>[-INF, endTime]</td>
+ * <td>I) find the last completed instant i_n before or on 'endTime;
+ * II) read the latest snapshot from table metadata if i_n is archived
or the commit metadata if it is still active</td>
+ * <td>'_hoodie_commit_time' = i_n</td>
+ * </tr>
+ * <tr>
+ * <td>[startTime, +INF]</td>
+ * <td>i).find the instant set setA, setA is a collection of all the
instants completed after or on 'startTime';
+ * ii). read the latest snapshot from table metadata if setA has archived
instants or the commit metadata if all the instants are still active</td>
+ * <td>'_hoodie_commit_time' in setA</td>
+ * </tr>
+ * <tr>
+ * <td>[earliest, endTime]</td>
+ * <td>i).find the instant set setA, setA is a collection of all the
instants completed in the given time range;
+ * ii). read the latest snapshot from table metadata if setA has archived
instants or the commit metadata if all the instants are still active</td>
+ * <td>'_hoodie_commit_time' in setA</td>
+ * </tr>
+ * </table>
+ *
+ * <p> A {@code RangeType} is required for analyzing the query so that the
query range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the
latest snapshot if there are files decoding the commit metadata are already
cleaned.
+ */
+public class IncrementalQueryAnalyzer {
+ public static final String START_COMMIT_EARLIEST = "earliest";
+
+ private final HoodieTableMetaClient metaClient;
+ private final Option<String> startTime;
+ private final Option<String> endTime;
+ private final InstantRange.RangeType rangeType;
+ private final boolean skipCompaction;
+ private final boolean skipClustering;
+ private final int limit;
+
+ private IncrementalQueryAnalyzer(
+ HoodieTableMetaClient metaClient,
+ String startTime,
+ String endTime,
+ InstantRange.RangeType rangeType,
+ boolean skipCompaction,
+ boolean skipClustering,
+ int limit) {
+ this.metaClient = metaClient;
+ this.startTime = Option.ofNullable(startTime);
+ this.endTime = Option.ofNullable(endTime);
+ this.rangeType = rangeType;
+ this.skipCompaction = skipCompaction;
+ this.skipClustering = skipClustering;
+ this.limit = limit;
+ }
+
+ /**
+ * Returns a builder.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Analyzes the incremental query context with given completion time range.
+ *
+ * @return An incremental query context including the instant time range
info.
+ */
+ public QueryContext analyze() {
+ try (CompletionTimeQueryView completionTimeQueryView = new
CompletionTimeQueryView(this.metaClient)) {
+ if (completionTimeQueryView.isEmptyTable()) {
+ // no dataset committed in the table
+ return QueryContext.EMPTY;
+ }
+ HoodieTimeline filteredTimeline = getFilteredTimeline(this.metaClient);
+ List<String> instantTimeList =
completionTimeQueryView.getStartTimes(filteredTimeline, startTime, endTime,
rangeType);
+ if (instantTimeList.isEmpty()) {
+ // no instants completed within the give time range, returns early.
+ return QueryContext.EMPTY;
+ }
+ // get hoodie instants
+ Pair<List<String>, List<String>> splitInstantTime =
splitInstantByActiveness(instantTimeList, completionTimeQueryView);
+ Set<String> instantTimeSet = new HashSet<>(instantTimeList);
+ List<String> archivedInstantTime = splitInstantTime.getLeft();
+ List<String> activeInstantTime = splitInstantTime.getRight();
+ List<HoodieInstant> archivedInstants = new ArrayList<>();
+ List<HoodieInstant> activeInstants = new ArrayList<>();
+ HoodieTimeline archivedReadTimeline = null;
+ if (!activeInstantTime.isEmpty()) {
+ activeInstants = filteredTimeline.getInstantsAsStream().filter(instant
->
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+ if (limit > 0 && limit < activeInstants.size()) {
+ // streaming read speed limit, limits the maximum number of commits
allowed to read for each run
+ activeInstants = activeInstants.subList(0, limit);
+ }
+ }
+ if (!archivedInstantTime.isEmpty()) {
+ archivedReadTimeline = getArchivedReadTimeline(metaClient,
archivedInstantTime.get(0));
+ archivedInstants =
archivedReadTimeline.getInstantsAsStream().filter(instant ->
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+ }
+ List<String> instants = Stream.concat(archivedInstants.stream(),
activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ if (instants.isEmpty()) {
+ // no instants completed within the give time range, returns early.
+ return QueryContext.EMPTY;
+ }
+ if (startTime.isEmpty() && endTime.isPresent()) {
+ instants = Collections.singletonList(instants.get(instants.size() -
1));
+ }
+ String lastInstant = instants.get(instants.size() - 1);
+ // null => if starting from earliest, if no start time is specified,
start from the latest instant like usual streaming read semantics.
+ // if startTime is neither, then use the earliest instant as the start
instant.
+ String startInstant =
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime.orElse(null)) ? null :
startTime.isEmpty() ? lastInstant : instants.get(0);
+ String endInstant = endTime.isEmpty() ? null : lastInstant;
+ return QueryContext.create(startInstant, endInstant, instants,
archivedInstants, activeInstants, filteredTimeline, archivedReadTimeline);
+ }
+ }
+
+ /**
+ * Splits the given instant time list into a pair of archived instant list
and active instant list.
+ */
+ private static Pair<List<String>, List<String>>
splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView
completionTimeQueryView) {
+ int firstActiveIdx = IntStream.range(0, instantTimeList.size()).filter(i
->
!completionTimeQueryView.isArchived(instantTimeList.get(i))).findFirst().orElse(-1);
+ if (firstActiveIdx == -1) {
+ return Pair.of(instantTimeList, Collections.emptyList());
+ } else if (firstActiveIdx == 0) {
+ return Pair.of(Collections.emptyList(), instantTimeList);
+ } else {
+ return Pair.of(instantTimeList.subList(0, firstActiveIdx),
instantTimeList.subList(firstActiveIdx, instantTimeList.size()));
+ }
+ }
+
+ private HoodieTimeline getFilteredTimeline(HoodieTableMetaClient metaClient)
{
+ HoodieTimeline timeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+ return filterInstantsAsPerUserConfigs(metaClient, timeline,
this.skipCompaction, this.skipClustering);
+ }
+
+ private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient
metaClient, String startInstant) {
+ HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(startInstant, false);
+ HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+ return filterInstantsAsPerUserConfigs(metaClient,
archivedCompleteTimeline, this.skipCompaction, this.skipClustering);
+ }
+
+ /**
+ * Filters out the unnecessary instants as per user specified configs.
+ *
+ * @param timeline The timeline.
+ *
+ * @return the filtered timeline
+ */
+ @VisibleForTesting
+ public static HoodieTimeline
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline
timeline, boolean skipCompaction, boolean skipClustering) {
+ final HoodieTimeline oriTimeline = timeline;
+ if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ &
skipCompaction) {
+ // the compaction commit uses 'commit' as action which is tricky
+ timeline = timeline.filter(instant ->
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ }
+ if (skipClustering) {
+ timeline = timeline.filter(instant ->
!ClusteringUtils.isClusteringInstant(instant, oriTimeline));
+ }
+ return timeline;
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+ /**
+ * Builder for {@link IncrementalQueryAnalyzer}.
+ */
+ public static class Builder {
+ /**
+ * Start completion time.
+ */
+ private String startTime;
+ /**
+ * End completion time.
+ */
+ private String endTime;
+ private InstantRange.RangeType rangeType;
+ private HoodieTableMetaClient metaClient;
+ private boolean skipCompaction = false;
+ private boolean skipClustering = false;
+ /**
+ * Maximum number of instants to read per run.
+ */
+ private int limit = -1;
+
+ public Builder() {
+ }
+
+ public Builder startTime(String startTime) {
+ this.startTime = startTime;
+ return this;
+ }
+
+ public Builder endTime(String endTime) {
+ this.endTime = endTime;
+ return this;
+ }
+
+ public Builder rangeType(InstantRange.RangeType rangeType) {
+ this.rangeType = rangeType;
+ return this;
+ }
+
+ public Builder metaClient(HoodieTableMetaClient metaClient) {
+ this.metaClient = metaClient;
+ return this;
+ }
+
+ public Builder skipCompaction(boolean skipCompaction) {
+ this.skipCompaction = skipCompaction;
+ return this;
+ }
+
+ public Builder skipClustering(boolean skipClustering) {
+ this.skipClustering = skipClustering;
+ return this;
+ }
+
+ public Builder limit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ public IncrementalQueryAnalyzer build() {
+ return new
IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient),
this.startTime, this.endTime,
+ Objects.requireNonNull(this.rangeType), this.skipCompaction,
this.skipClustering, this.limit);
+ }
+ }
+
+ /**
+ * Represents the analyzed query context.
+ */
+ public static class QueryContext {
+ public static final QueryContext EMPTY = new QueryContext(null, null,
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),
null, null);
+
+ /**
+ * An empty option indicates consumption from the earliest instant.
+ */
+ private final Option<String> startInstant;
+ /**
+ * An empty option indicates consumption to the latest instant.
+ */
+ private final Option<String> endInstant;
+ private final List<HoodieInstant> archivedInstants;
+ private final List<HoodieInstant> activeInstants;
+ /**
+ * The active timeline to read filtered by given configurations.
+ */
+ private final HoodieTimeline activeTimeline;
+ /**
+ * The archived timeline to read filtered by given configurations.
+ */
+ private final HoodieTimeline archivedTimeline;
+ private final List<String> instants;
+
+ private QueryContext(
+ @Nullable String startInstant,
+ @Nullable String endInstant,
+ List<String> instants,
+ List<HoodieInstant> archivedInstants,
+ List<HoodieInstant> activeInstants,
+ HoodieTimeline activeTimeline,
+ @Nullable HoodieTimeline archivedTimeline) {
+ this.startInstant = Option.ofNullable(startInstant);
+ this.endInstant = Option.ofNullable(endInstant);
+ this.archivedInstants = archivedInstants;
+ this.activeInstants = activeInstants;
+ this.activeTimeline = activeTimeline;
+ this.archivedTimeline = archivedTimeline;
+ this.instants = instants;
+ }
+
+ public static QueryContext create(
+ @Nullable String startInstant,
+ @Nullable String endInstant,
+ List<String> instants,
+ List<HoodieInstant> archivedInstants,
+ List<HoodieInstant> activeInstants,
+ HoodieTimeline activeTimeline,
+ @Nullable HoodieTimeline archivedTimeline) {
+ return new QueryContext(startInstant, endInstant, instants,
archivedInstants, activeInstants, activeTimeline, archivedTimeline);
+ }
+
+ public boolean isEmpty() {
+ return this.instants.isEmpty();
+ }
+
+ public Option<String> getStartInstant() {
+ return startInstant;
+ }
+
+ public Option<String> getEndInstant() {
+ return endInstant;
+ }
+
+ /**
+ * Returns the latest instant time which should be included physically in
reading.
+ */
+ public String getLastInstant() {
+ return this.instants.get(this.instants.size() - 1);
+ }
+
+ public List<HoodieInstant> getArchivedInstants() {
+ return archivedInstants;
+ }
+
+ public List<HoodieInstant> getActiveInstants() {
+ return activeInstants;
+ }
+
+ public boolean isConsumingFromEarliest() {
+ return startInstant.isEmpty();
+ }
+
+ public boolean isConsumingToLatest() {
+ return endInstant.isEmpty();
+ }
+
+ public String getMaxCompletionTime() {
+ if (this.activeInstants.size() > 0) {
+ return
this.activeInstants.stream().map(HoodieInstant::getCompletionTime).filter(Objects::nonNull).max(String::compareTo).get();
+ } else {
+ // all the query instants are archived, use the latest active instant
completion time as
+ // the file slice version upper threshold, because very probably these
files already got cleaned,
+ // use the max completion time of the archived instants could yield
empty file slices.
+ return
this.activeTimeline.getInstantsAsStream().map(HoodieInstant::getCompletionTime).filter(Objects::nonNull).max(String::compareTo).get();
+ }
+ }
+
+ public Option<InstantRange> getInstantRange() {
+ if (isConsumingFromEarliest()) {
+ if (isConsumingToLatest()) {
+ // A null instant range indicates no filtering.
+ // short-cut for snapshot read
+ return Option.empty();
+ }
+ return Option.of(InstantRange.builder()
+ .startInstant(startInstant.orElse(null))
+ .endInstant(endInstant.orElse(null))
+ .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+ .nullableBoundary(true)
+ .build());
+ } else {
+ return Option.of(InstantRange.builder()
+ .rangeType(InstantRange.RangeType.EXACT_MATCH)
+ .explicitInstants(new HashSet<>(instants))
+ .build());
+ }
+ }
+
+ public HoodieTimeline getActiveTimeline() {
+ return this.activeTimeline;
+ }
+
+ public @Nullable HoodieTimeline getArchivedTimeline() {
+ return archivedTimeline;
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 6bac49b83c9..e14381bd373 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -21,22 +21,28 @@ package org.apache.hudi.common.table.timeline;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import java.io.Serializable;
import java.time.Instant;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.START_COMMIT_EARLIEST;
import static
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
/**
* Query view for instant completion time.
@@ -54,13 +60,13 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
* Mapping from instant start time -> completion time.
* Should be thread-safe data structure.
*/
- private final Map<String, String> startToCompletionInstantTimeMap;
+ private final ConcurrentMap<String, String> beginToCompletionInstantTimeMap;
/**
* The cursor instant time to eagerly load from, by default load last N days
of completed instants.
- * It is tuned dynamically with lazy loading occurs, assumes an initial
cursor instant as t10,
- * a completion query for t5 would trigger a lazy loading with this cursor
instant been updated as t5.
- * The sliding of the cursor instant economizes redundant loading from
different queries.
+ * It can grow dynamically with lazy loading. e.g. assuming an initial
cursor instant as t10,
+ * a completion query for t5 would trigger lazy loading with this cursor
instant updated to t5.
+ * This sliding window model amortizes redundant loading from different
queries.
*/
private volatile String cursorInstant;
@@ -82,12 +88,12 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
* The constructor.
*
* @param metaClient The table meta client.
- * @param cursorInstant The earliest instant time to eagerly load from, by
default load last N days of completed instants.
+ * @param eagerLoadInstant The earliest instant time to eagerly load from,
by default load last N days of completed instants.
*/
- public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String
cursorInstant) {
+ public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String
eagerLoadInstant) {
this.metaClient = metaClient;
- this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
- this.cursorInstant = HoodieTimeline.minInstant(cursorInstant,
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+ this.beginToCompletionInstantTimeMap = new ConcurrentHashMap<>();
+ this.cursorInstant = HoodieTimeline.minInstant(eagerLoadInstant,
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
// Note: use getWriteTimeline() to keep sync with the fs view
visibleCommitsAndCompactionTimeline, see
AbstractTableFileSystemView.refreshTimeline.
this.firstNonSavepointCommit =
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
load();
@@ -96,9 +102,16 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
/**
* Returns whether the instant is completed.
*/
- public boolean isCompleted(String instantTime) {
- return this.startToCompletionInstantTimeMap.containsKey(instantTime)
- || HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN,
this.firstNonSavepointCommit);
+ public boolean isCompleted(String beginInstantTime) {
+ // archival does not proceed beyond the first savepoint, so any instant
before that is completed.
+ return this.beginToCompletionInstantTimeMap.containsKey(beginInstantTime)
|| isArchived(beginInstantTime);
+ }
+
+ /**
+ * Returns whether the instant is archived.
+ */
+ public boolean isArchived(String instantTime) {
+ return HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN,
this.firstNonSavepointCommit);
}
/**
@@ -113,7 +126,7 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
}
/**
- * Returns whether the give instant time {@code instantTime} is sliced after
or on the base instant {@code baseInstant}.
+ * Returns whether the given instant time {@code instantTime} is sliced
after or on the base instant {@code baseInstant}.
*/
public boolean isSlicedAfterOrOn(String baseInstant, String instantTime) {
Option<String> completionTimeOpt = getCompletionTime(baseInstant,
instantTime);
@@ -153,21 +166,21 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
/**
* Queries the instant completion time with given start time.
*
- * @param startTime The start time.
+ * @param beginTime The start time.
*
* @return The completion time if the instant finished or empty if it is
still pending.
*/
- public Option<String> getCompletionTime(String startTime) {
- String completionTime =
this.startToCompletionInstantTimeMap.get(startTime);
+ public Option<String> getCompletionTime(String beginTime) {
+ String completionTime =
this.beginToCompletionInstantTimeMap.get(beginTime);
if (completionTime != null) {
return Option.of(completionTime);
}
- if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN_OR_EQUALS,
this.cursorInstant)) {
+ if (HoodieTimeline.compareTimestamps(beginTime, GREATER_THAN_OR_EQUALS,
this.cursorInstant)) {
// the instant is still pending
return Option.empty();
}
- loadCompletionTimeIncrementally(startTime);
- return
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
+ loadCompletionTimeIncrementally(beginTime);
+ return
Option.ofNullable(this.beginToCompletionInstantTimeMap.get(beginTime));
}
/**
@@ -175,42 +188,111 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
*
* <p>By default, assumes there is at most 1 day time of duration for an
instant to accelerate the queries.
*
- * @param startCompletionTime The start completion time.
- * @param endCompletionTime The end completion time.
+ * @param timeline The timeline.
+ * @param rangeStart The query range start completion time.
+ * @param rangeEnd The query range end completion time.
+ * @param rangeType The range type.
*
- * @return The instant time set.
+ * @return The sorted instant time list.
*/
- public Set<String> getStartTimeSet(String startCompletionTime, String
endCompletionTime) {
+ public List<String> getStartTimes(
+ HoodieTimeline timeline,
+ Option<String> rangeStart,
+ Option<String> rangeEnd,
+ InstantRange.RangeType rangeType) {
// assumes any instant/transaction lasts at most 1 day to optimize the
query efficiency.
- return getStartTimeSet(startCompletionTime, endCompletionTime, s ->
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+ return getStartTimes(timeline, rangeStart, rangeEnd, rangeType, s ->
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
}
/**
* Queries the instant start time with given completion time range.
*
- * @param startCompletionTime The start completion time.
- * @param endCompletionTime The end completion time.
- * @param earliestStartTimeFunc The function to generate the earliest start
time boundary
- * with the minimum completion time {@code
startCompletionTime}.
+ * @param rangeStart The query range start completion time.
+ * @param rangeEnd The query range end completion time.
+ * @param earliestInstantTimeFunc The function to generate the earliest
start time boundary
+ * with the minimum completion time.
*
- * @return The instant time set.
+ * @return The sorted instant time list.
*/
- public Set<String> getStartTimeSet(String startCompletionTime, String
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
- String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+ @VisibleForTesting
+ public List<String> getStartTimes(
+ String rangeStart,
+ String rangeEnd,
+ Function<String, String> earliestInstantTimeFunc) {
+ return
getStartTimes(metaClient.getCommitsTimeline().filterCompletedInstants(),
Option.ofNullable(rangeStart), Option.ofNullable(rangeEnd),
+ InstantRange.RangeType.CLOSED_CLOSED, earliestInstantTimeFunc);
+ }
+
+ /**
+ * Queries the instant start time with given completion time range.
+ *
+ * @param timeline The timeline.
+ * @param rangeStart The query range start completion time.
+ * @param rangeEnd The query range end completion time.
+ * @param rangeType The range type.
+ * @param earliestInstantTimeFunc The function to generate the earliest
start time boundary
+ * with the minimum completion time.
+ *
+ * @return The sorted instant time list.
+ */
+ public List<String> getStartTimes(
+ HoodieTimeline timeline,
+ Option<String> rangeStart,
+ Option<String> rangeEnd,
+ InstantRange.RangeType rangeType,
+ Function<String, String> earliestInstantTimeFunc) {
+ final boolean startFromEarliest =
START_COMMIT_EARLIEST.equalsIgnoreCase(rangeStart.orElse(null));
+ String earliestInstantToLoad = null;
+ if (rangeStart.isPresent() && !startFromEarliest) {
+ earliestInstantToLoad = earliestInstantTimeFunc.apply(rangeStart.get());
+ } else if (rangeEnd.isPresent()) {
+ earliestInstantToLoad = earliestInstantTimeFunc.apply(rangeEnd.get());
+ }
+
+ // ensure the earliest instant boundary be loaded.
+ if (earliestInstantToLoad != null &&
HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN,
earliestInstantToLoad)) {
+ loadCompletionTimeIncrementally(earliestInstantToLoad);
+ }
+
+ if (rangeStart.isEmpty() && rangeEnd.isPresent()) {
+ // returns the last instant that finished at or before the given
completion time 'endTime'.
+ String maxInstantTime = timeline.getInstantsAsStream()
+ .filter(instant -> instant.isCompleted() &&
HoodieTimeline.compareTimestamps(instant.getCompletionTime(),
LESSER_THAN_OR_EQUALS, rangeEnd.get()))
+
.max(Comparator.comparing(HoodieInstant::getCompletionTime)).map(HoodieInstant::getTimestamp).orElse(null);
+ if (maxInstantTime != null) {
+ return Collections.singletonList(maxInstantTime);
+ }
+ // fallback to archived timeline
+ return this.beginToCompletionInstantTimeMap.entrySet().stream()
+ .filter(entry -> HoodieTimeline.compareTimestamps(entry.getValue(),
LESSER_THAN_OR_EQUALS, rangeEnd.get()))
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+ }
+
+ if (startFromEarliest) {
+ // expedience for snapshot read: ['earliest', _) to avoid loading
unnecessary instants.
+ rangeStart = Option.empty();
+ }
+
+ if (rangeStart.isEmpty() && rangeEnd.isEmpty()) {
+ // (_, _): read the latest snapshot.
+ return timeline.filterCompletedInstants().lastInstant().map(instant ->
Collections.singletonList(instant.getTimestamp())).orElse(Collections.emptyList());
+ }
+
final InstantRange instantRange = InstantRange.builder()
- .rangeType(InstantRange.RangeType.CLOSE_CLOSE)
- .startInstant(startCompletionTime)
- .endInstant(endCompletionTime)
+ .rangeType(rangeType)
+ .startInstant(rangeStart.orElse(null))
+ .endInstant(rangeEnd.orElse(null))
.nullableBoundary(true)
.build();
- if (HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN,
startInstant)) {
- loadCompletionTimeIncrementally(startInstant);
- }
- return this.startToCompletionInstantTimeMap.entrySet().stream()
+ return this.beginToCompletionInstantTimeMap.entrySet().stream()
.filter(entry -> instantRange.isInRange(entry.getValue()))
- .map(Map.Entry::getKey).collect(Collectors.toSet());
+ .map(Map.Entry::getKey).sorted().collect(Collectors.toList());
}
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
private void loadCompletionTimeIncrementally(String startTime) {
// the 'startTime' should be out of the eager loading range, switch to a
lazy loading.
// This operation is resource costly.
@@ -250,20 +332,24 @@ public class CompletionTimeQueryView implements
AutoCloseable, Serializable {
setCompletionTime(instantTime, completionTime);
}
- private void setCompletionTime(String instantTime, String completionTime) {
+ private void setCompletionTime(String beginInstantTime, String
completionTime) {
if (completionTime == null) {
// the meta-server instant does not have completion time
- completionTime = instantTime;
+ completionTime = beginInstantTime;
}
- this.startToCompletionInstantTimeMap.putIfAbsent(instantTime,
completionTime);
+ this.beginToCompletionInstantTimeMap.putIfAbsent(beginInstantTime,
completionTime);
}
public String getCursorInstant() {
return cursorInstant;
}
+ public boolean isEmptyTable() {
+ return this.beginToCompletionInstantTimeMap.isEmpty();
+ }
+
@Override
- public void close() throws Exception {
- this.startToCompletionInstantTimeMap.clear();
+ public void close() {
+ this.beginToCompletionInstantTimeMap.clear();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index 2e48e40820d..a4f4b2cdf24 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -108,6 +108,10 @@ public class HoodieInstantTimeGenerator {
public static String instantTimeMinusMillis(String timestamp, long
milliseconds) {
try {
String timestampInMillis = fixInstantTimeCompatibility(timestamp);
+ // To work with tests, that generate arbitrary timestamps, we need to
pad the timestamp with 0s.
+ if (timestampInMillis.length() < MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH)
{
+ return String.format("%0" + timestampInMillis.length() + "d", 0);
+ }
LocalDateTime dt = LocalDateTime.parse(timestampInMillis,
MILLIS_INSTANT_TIME_FORMATTER);
ZoneId zoneId = HoodieTimelineTimeZone.UTC.equals(commitTimeZone) ?
ZoneId.of("UTC") : ZoneId.systemDefault();
return
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().minusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index b7ed15a07af..e9ae9cfbdfb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -35,6 +35,7 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -298,4 +299,18 @@ public class ClusteringUtils {
}
return oldestInstantToRetain;
}
+
+ /**
+ * Returns whether the given instant {@code instant} is with clustering
operation.
+ */
+ public static boolean isClusteringInstant(HoodieInstant instant,
HoodieTimeline timeline) {
+ if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return false;
+ }
+ try {
+ return TimelineUtils.getCommitMetadata(instant,
timeline).getOperationType().equals(WriteOperationType.CLUSTER);
+ } catch (IOException e) {
+ throw new HoodieException("Resolve replace commit metadata error for
instant: " + instant, e);
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index a4a6da2e29b..49f067b8442 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -217,7 +217,7 @@ public class HoodieMetadataLogRecordReader implements
Closeable {
public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps)
{
InstantRange instantRange = InstantRange.builder()
- .rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
+ .rangeType(InstantRange.RangeType.EXACT_MATCH)
.explicitInstants(validLogBlockTimestamps).build();
scannerBuilder.withInstantRange(Option.of(instantRange));
return this;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index fbe8fcdd62b..ae1a86f36cc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -273,10 +273,10 @@ public class OptionsResolver {
}
/**
- * Returns whether the read commits limit is specified.
+ * Returns the read commits limit or -1 if not specified.
*/
- public static boolean hasReadCommitsLimit(Configuration conf) {
- return conf.contains(FlinkOptions.READ_COMMITS_LIMIT);
+ public static int getReadCommitsLimit(Configuration conf) {
+ return conf.getInteger(FlinkOptions.READ_COMMITS_LIMIT, -1);
}
/**
@@ -407,4 +407,12 @@ public class OptionsResolver {
}
return options;
}
+
+ /**
+ * Whether the reader only consumes new commit instants.
+ */
+ public static boolean isOnlyConsumingNewCommits(Configuration conf) {
+ return isMorTable(conf) &&
conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT) // this is only true
for flink.
+ || isAppendMode(conf) &&
conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 9586e9ce96c..c1cd5874d96 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.InstantRange;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -40,10 +40,7 @@ import
org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
-import org.apache.hudi.util.ClusteringUtil;
-import org.apache.hudi.util.StreamerUtil;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
@@ -67,11 +64,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
-import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
-
/**
* Utilities to generate incremental input splits {@link
MergeOnReadInputSplit}.
* The input splits are used for streaming and incremental read.
@@ -135,44 +127,37 @@ public class IncrementalInputSplits implements
Serializable {
public Result inputSplits(
HoodieTableMetaClient metaClient,
boolean cdcEnabled) {
- HoodieTimeline commitTimeline = getReadTimeline(metaClient);
- if (commitTimeline.empty()) {
- LOG.warn("No splits found for the table under path " + path);
+
+ IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder()
+ .metaClient(metaClient)
+ .startTime(this.conf.getString(FlinkOptions.READ_START_COMMIT))
+ .endTime(this.conf.getString(FlinkOptions.READ_END_COMMIT))
+ .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+ .skipCompaction(skipCompaction)
+ .skipClustering(skipClustering)
+ .build();
+
+ IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze();
+
+ if (analyzingResult.isEmpty()) {
+ LOG.info("No new instant found for the table under path " + path + ",
skip reading");
return Result.EMPTY;
}
-
- final String startCommit =
this.conf.getString(FlinkOptions.READ_START_COMMIT);
- final String endCommit = this.conf.getString(FlinkOptions.READ_END_COMMIT);
- final boolean startFromEarliest =
FlinkOptions.START_COMMIT_EARLIEST.equalsIgnoreCase(startCommit);
- final boolean startOutOfRange = startCommit != null &&
commitTimeline.isBeforeTimelineStarts(startCommit);
- final boolean endOutOfRange = endCommit != null &&
commitTimeline.isBeforeTimelineStarts(endCommit);
+ final HoodieTimeline commitTimeline = analyzingResult.getActiveTimeline();
+ final boolean startFromEarliest =
analyzingResult.isConsumingFromEarliest();
+ final boolean hasArchivedInstants =
!analyzingResult.getArchivedInstants().isEmpty();
// We better add another premise: whether the endCommit is cleaned.
- boolean fullTableScan = startFromEarliest || startOutOfRange ||
endOutOfRange;
-
- List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline,
null);
+ boolean fullTableScan = startFromEarliest || hasArchivedInstants;
// Step1: generates the instant range
// if the specified end commit is archived, still uses the specified
timestamp,
// else uses the latest filtered instant time
// (would be the latest instant time if the specified end commit is
greater than the latest instant time)
- final String rangeEnd = endOutOfRange || instants.isEmpty() ? endCommit :
instants.get(instants.size() - 1).getTimestamp();
- // keep the same semantics with streaming read, default start from the
latest commit
- final String rangeStart = startFromEarliest ? null : (startCommit == null
? rangeEnd : startCommit);
- final InstantRange instantRange;
- if (!fullTableScan) {
- instantRange =
InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
-
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(cdcEnabled).build();
- } else if (startFromEarliest && endCommit == null) {
- // short-cut for snapshot read
- instantRange = null;
- } else {
- instantRange =
InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
-
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
- }
+
+ final InstantRange instantRange =
analyzingResult.getInstantRange().orElse(null);
+
// Step2: decides the read end commit
- final String endInstant = endOutOfRange || endCommit == null
- ? commitTimeline.lastInstant().get().getTimestamp()
- : rangeEnd;
+ final String endInstant = analyzingResult.getLastInstant();
// Step3: find out the files to read, tries to read the files from the
commit metadata first,
// fallback to full table scan if any of the following conditions matches:
@@ -192,10 +177,6 @@ public class IncrementalInputSplits implements
Serializable {
}
fileStatuses = fileIndex.getFilesInPartitions();
} else {
- if (instants.size() == 0) {
- LOG.info("No new instant found for the table under path " + path + ",
skip reading");
- return Result.EMPTY;
- }
if (cdcEnabled) {
// case1: cdc change log enabled
List<MergeOnReadInputSplit> inputSplits =
getCdcInputSplits(metaClient, instantRange);
@@ -203,6 +184,7 @@ public class IncrementalInputSplits implements Serializable
{
}
// case2: normal incremental read
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+ List<HoodieInstant> instants = analyzingResult.getActiveInstants();
List<HoodieCommitMetadata> metadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, commitTimeline)).collect(Collectors.toList());
readPartitions = getReadPartitions(metadataList);
@@ -233,7 +215,7 @@ public class IncrementalInputSplits implements Serializable
{
}
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient,
commitTimeline,
- fileStatuses, readPartitions, endInstant, instantRange, false);
+ fileStatuses, readPartitions, endInstant,
analyzingResult.getMaxCompletionTime(), instantRange, false);
return Result.instance(inputSplits, endInstant);
}
@@ -242,7 +224,6 @@ public class IncrementalInputSplits implements Serializable
{
* Returns the incremental input splits.
*
* @param metaClient The meta client
- * @param issuedInstant The last issued instant, only valid in streaming read
* @param issuedOffset The last issued offset, only valid in streaming read
* @param cdcEnabled Whether cdc is enabled
*
@@ -250,51 +231,36 @@ public class IncrementalInputSplits implements
Serializable {
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
- @Nullable String issuedInstant,
@Nullable String issuedOffset,
boolean cdcEnabled) {
metaClient.reloadActiveTimeline();
- HoodieTimeline commitTimeline = getReadTimeline(metaClient);
- if (commitTimeline.empty()) {
- LOG.warn("No splits found for the table under path " + path);
- return Result.EMPTY;
- }
+ IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder()
+ .metaClient(metaClient)
+ .startTime(issuedOffset != null ? issuedOffset :
this.conf.getString(FlinkOptions.READ_START_COMMIT))
+ .endTime(this.conf.getString(FlinkOptions.READ_END_COMMIT))
+ .rangeType(issuedOffset != null ? InstantRange.RangeType.OPEN_CLOSED :
InstantRange.RangeType.CLOSED_CLOSED)
+ .skipCompaction(skipCompaction)
+ .skipClustering(skipClustering)
+ .limit(OptionsResolver.getReadCommitsLimit(conf))
+ .build();
- // Assumes a timeline:
- // c1.inflight, c2(issued instant), c3, c4
- // -> c1, c2(issued instant), c3, c4
- // c1, c3 and c4 are the candidate instants,
- // we call c1 a 'hollow' instant which has lower version number but
greater completion time,
- // filtering the timeline using just c2 could cause data loss,
- // check these hollow instants first.
- Result hollowSplits = getHollowInputSplits(metaClient,
metaClient.getHadoopConf(), issuedInstant, issuedOffset, commitTimeline,
cdcEnabled);
-
- List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline,
issuedInstant);
- // streaming read speed limit, limits the maximum number of commits
allowed to read in each instant check
- if (OptionsResolver.hasReadCommitsLimit(conf)) {
- int readLimit = this.conf.getInteger(FlinkOptions.READ_COMMITS_LIMIT);
- instants = instants.subList(0, Math.min(readLimit, instants.size()));
- }
+ IncrementalQueryAnalyzer.QueryContext queryContext = analyzer.analyze();
- // get the latest instant that satisfies condition
- final String endInstant = instants.size() == 0 ? null :
instants.get(instants.size() - 1).getTimestamp();
- final InstantRange instantRange;
- if (endInstant != null) {
- // when cdc is enabled, returns instant range with nullable boundary
- // to filter the reading instants on the timeline
- instantRange = getInstantRange(issuedInstant, endInstant, cdcEnabled);
- } else if (hollowSplits.isEmpty()) {
+ if (queryContext.isEmpty()) {
LOG.info("No new instant found for the table under path " + path + ",
skip reading");
return Result.EMPTY;
- } else {
- return hollowSplits;
}
+ HoodieTimeline commitTimeline = queryContext.getActiveTimeline();
+ // get the latest instant that satisfies condition
+ final String endInstant = queryContext.getLastInstant();
+ final Option<InstantRange> instantRange = queryContext.getInstantRange();
+
// version number should be monotonically increasing
// fetch the instant offset by completion time
- String offsetToIssue =
instants.stream().map(HoodieInstant::getCompletionTime).max(String::compareTo).orElse(endInstant);
+ String offsetToIssue = queryContext.getMaxCompletionTime();
- if (instantRange == null) {
+ if (instantRange.isEmpty()) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = getFileIndex();
@@ -311,12 +277,12 @@ public class IncrementalInputSplits implements
Serializable {
}
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient,
commitTimeline,
- fileStatuses, readPartitions, endInstant, null, false);
+ fileStatuses, readPartitions, endInstant, offsetToIssue, null,
false);
return Result.instance(inputSplits, endInstant, offsetToIssue);
} else {
- List<MergeOnReadInputSplit> inputSplits = getIncInputSplits(metaClient,
metaClient.getHadoopConf(), commitTimeline, instants, instantRange, endInstant,
cdcEnabled);
- return Result.instance(mergeList(hollowSplits.getInputSplits(),
inputSplits), endInstant, offsetToIssue);
+ List<MergeOnReadInputSplit> inputSplits = getIncInputSplits(metaClient,
metaClient.getHadoopConf(), commitTimeline, queryContext, instantRange.get(),
endInstant, cdcEnabled);
+ return Result.instance(inputSplits, endInstant, offsetToIssue);
}
}
@@ -327,7 +293,7 @@ public class IncrementalInputSplits implements Serializable
{
HoodieTableMetaClient metaClient,
org.apache.hadoop.conf.Configuration hadoopConf,
HoodieTimeline commitTimeline,
- List<HoodieInstant> instants,
+ IncrementalQueryAnalyzer.QueryContext queryContext,
InstantRange instantRange,
String endInstant,
boolean cdcEnabled) {
@@ -338,9 +304,10 @@ public class IncrementalInputSplits implements
Serializable {
}
// case2: normal streaming read
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
- List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+ List<HoodieCommitMetadata> activeMetadataList =
queryContext.getActiveInstants().stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, commitTimeline)).collect(Collectors.toList());
- List<HoodieCommitMetadata> archivedMetadataList =
getArchivedMetadata(metaClient, instantRange, tableName);
+ List<HoodieCommitMetadata> archivedMetadataList =
queryContext.getArchivedInstants().stream()
+ .map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, queryContext.getArchivedTimeline())).collect(Collectors.toList());
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+
"--------------------------------------------------------------------------------\n"
@@ -364,74 +331,7 @@ public class IncrementalInputSplits implements
Serializable {
}
return getInputSplits(metaClient, commitTimeline,
- fileStatuses, readPartitions, endInstant, instantRange,
skipCompaction);
- }
-
- /**
- * Returns the input splits for 'hollow' instants.
- */
- private Result getHollowInputSplits(
- HoodieTableMetaClient metaClient,
- org.apache.hadoop.conf.Configuration hadoopConf,
- @Nullable String issuedInstant,
- @Nullable String issuedOffset,
- HoodieTimeline commitTimeline,
- boolean cdcEnabled) {
- if (issuedInstant == null || issuedOffset == null) {
- return Result.EMPTY;
- }
- // find the write commit instant that finishes later than the issued
instant
- // while with smaller txn start time.
- List<HoodieInstant> instants = commitTimeline.getInstantsAsStream()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, issuedInstant))
- .filter(s -> HoodieTimeline.compareTimestamps(s.getCompletionTime(),
GREATER_THAN, issuedOffset))
- .filter(s -> StreamerUtil.isWriteCommit(metaClient.getTableType(), s,
commitTimeline)).collect(Collectors.toList());
- if (instants.isEmpty()) {
- return Result.EMPTY;
- }
- String offsetToIssue =
instants.stream().map(HoodieInstant::getCompletionTime).max(String::compareTo).orElse(issuedOffset);
- List<MergeOnReadInputSplit> inputSplits = instants.stream().map(instant ->
{
- String instantTs = instant.getTimestamp();
-
- // Assumes we consume from timeline:
- // c0, c1.inflight, c2(issued instant), c3, c4
- // -> c0, c1, c2(issued instant), c3, c4
- // c1, c3 and c4 are the candidate instants,
-
- // c4 data file could include overlapping records from c2,
- // use (c2, c4] instant range for c3 and c4,
-
- // c1 data file could include overlapping records from c0,
- // use the [c1, c1] instant range for c1.
- InstantRange instantRange = InstantRange.builder()
- .startInstant(instantTs)
- .endInstant(instantTs)
- .nullableBoundary(cdcEnabled)
- .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
- return getIncInputSplits(metaClient, hadoopConf, commitTimeline,
Collections.singletonList(instant), instantRange, instantTs, cdcEnabled);
- }).flatMap(Collection::stream).collect(Collectors.toList());
- return Result.instance(inputSplits, issuedInstant, offsetToIssue);
- }
-
- @Nullable
- private InstantRange getInstantRange(String issuedInstant, String
instantToIssue, boolean nullableBoundary) {
- if (issuedInstant != null) {
- // the streaming reader may record the last issued instant, if the
issued instant is present,
- // the instant range should be: (issued instant, the latest instant].
- return
InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue)
-
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
- } else if
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
- // first time consume and has a start commit
- final String startCommit =
this.conf.getString(FlinkOptions.READ_START_COMMIT);
- return startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
- ? null
- :
InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue)
-
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
- } else {
- // first time consume and no start commit, consumes the latest
incremental data set.
- return
InstantRange.builder().startInstant(instantToIssue).endInstant(instantToIssue)
-
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
- }
+ fileStatuses, readPartitions, endInstant,
queryContext.getMaxCompletionTime(), instantRange, skipCompaction);
}
private List<MergeOnReadInputSplit> getInputSplits(
@@ -440,14 +340,14 @@ public class IncrementalInputSplits implements
Serializable {
FileStatus[] fileStatuses,
Set<String> readPartitions,
String endInstant,
+ String maxCompletionTime,
InstantRange instantRange,
boolean skipBaseFiles) {
final HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
- final String maxQueryInstantTime = getMaxQueryInstantTime(fsView,
endInstant);
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
return readPartitions.stream()
- .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath,
maxQueryInstantTime, skipBaseFiles)
+ .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath,
maxCompletionTime, skipBaseFiles)
.map(fileSlice -> {
Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -467,29 +367,6 @@ public class IncrementalInputSplits implements
Serializable {
.collect(Collectors.toList());
}
- /**
- * Returns the upper threshold of query instant time, when NB-CC is enabled,
imagine the table has actions as follows:
- *
- * <pre>
- * t1_t5.delta_commit, t2.compaction.inflight, t3_t4.delta_commit
- * </pre>
- *
- * <p>If the user specified query range end instant is t1, when the query is
executed at t3, the API #getXXXFileSlicesBeforeOrOn
- * will just ignore delta commit t1_t5 because its end instant t1 is less
than the latest file slice base instant t2.
- * We should refactor this out when all the fs view incremental APIs migrate
to completion time based semantics.
- *
- * <p>CAUTION: The query is costly when endInstant is archived.
- */
- private String getMaxQueryInstantTime(HoodieTableFileSystemView fsView,
String endInstant) {
- if (OptionsResolver.isMorTable(conf)) {
- Option<String> completionTime = fsView.getCompletionTime(endInstant);
- if (completionTime.isPresent()) {
- return completionTime.get();
- }
- }
- return endInstant;
- }
-
private List<MergeOnReadInputSplit> getCdcInputSplits(
HoodieTableMetaClient metaClient,
InstantRange instantRange) {
@@ -549,103 +426,6 @@ public class IncrementalInputSplits implements
Serializable {
return partitions;
}
- /**
- * Returns the archived metadata in case the reader consumes untimely or it
wants
- * to read from the earliest.
- *
- * <p>Note: should improve it with metadata table when the metadata table is
stable enough.
- *
- * @param metaClient The meta client
- * @param instantRange The instant range to filter the timeline instants
- * @param tableName The table name
- * @return the list of archived metadata, or empty if there is no need to
read the archived timeline
- */
- private List<HoodieCommitMetadata> getArchivedMetadata(
- HoodieTableMetaClient metaClient,
- InstantRange instantRange,
- String tableName) {
- if
(metaClient.getActiveTimeline().isBeforeTimelineStarts(instantRange.getStartInstant()))
{
- // read the archived metadata if the start instant is archived.
- HoodieTimeline archivedTimeline = getArchivedReadTimeline(metaClient,
instantRange.getStartInstant());
- if (!archivedTimeline.empty()) {
- return archivedTimeline.getInstantsAsStream()
- .map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, archivedTimeline)).collect(Collectors.toList());
- }
- }
- return Collections.emptyList();
- }
-
- private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
- HoodieTimeline timeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
- return filterInstantsAsPerUserConfigs(timeline);
- }
-
- private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient
metaClient, String startInstant) {
- HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(startInstant, false);
- HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
- return filterInstantsAsPerUserConfigs(archivedCompleteTimeline);
- }
-
- /**
- * Returns the instants with a given issuedInstant to start from.
- *
- * @param commitTimeline The completed commits timeline
- * @param issuedInstant The last issued instant that has already been
delivered to downstream
- *
- * @return the filtered hoodie instants
- */
- @VisibleForTesting
- public List<HoodieInstant> filterInstantsWithRange(
- HoodieTimeline commitTimeline,
- @Nullable final String issuedInstant) {
- HoodieTimeline completedTimeline =
commitTimeline.filterCompletedInstants();
- if (issuedInstant != null) {
- // returns early for streaming mode
- return completedTimeline
- .getInstantsAsStream()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, issuedInstant))
- .collect(Collectors.toList());
- }
-
- Stream<HoodieInstant> instantStream =
completedTimeline.getInstantsAsStream();
-
- if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
- // by default read from the latest commit
- return
completedTimeline.lastInstant().map(Collections::singletonList).orElseGet(Collections::emptyList);
- }
-
- if (OptionsResolver.isSpecificStartCommit(this.conf)) {
- final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
- instantStream = instantStream
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, startCommit));
- }
- if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
- final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
- instantStream = instantStream.filter(s ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS,
endCommit));
- }
- return instantStream.collect(Collectors.toList());
- }
-
- /**
- * Filters out the unnecessary instants as per user specified configs.
- *
- * @param timeline The timeline
- *
- * @return the filtered timeline
- */
- @VisibleForTesting
- public HoodieTimeline filterInstantsAsPerUserConfigs(HoodieTimeline
timeline) {
- final HoodieTimeline oriTimeline = timeline;
- if (OptionsResolver.isMorTable(this.conf) & this.skipCompaction) {
- // the compaction commit uses 'commit' as action which is tricky
- timeline = timeline.filter(instant ->
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
- }
- if (this.skipClustering) {
- timeline = timeline.filter(instant ->
!ClusteringUtil.isClusteringInstant(instant, oriTimeline));
- }
- return timeline;
- }
-
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
if (list1.isEmpty()) {
return list2;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 86e32fe5a0a..fa911cadb0e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -212,7 +212,7 @@ public class StreamReadMonitoringFunction
return;
}
IncrementalInputSplits.Result result =
- incrementalInputSplits.inputSplits(metaClient, this.issuedInstant,
this.issuedOffset, this.cdcEnabled);
+ incrementalInputSplits.inputSplits(metaClient, this.issuedOffset,
this.cdcEnabled);
if (result.isEmpty()) {
// no new instants, returns early
return;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index ac81b4e7af4..e7ebf44b7d3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -19,25 +19,21 @@
package org.apache.hudi.util;
import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.model.WriteOperationType;
+import
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieFlinkTable;
-import
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -115,18 +111,4 @@ public class ClusteringUtil {
commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false));
}
}
-
- /**
- * Returns whether the given instant {@code instant} is with clustering
operation.
- */
- public static boolean isClusteringInstant(HoodieInstant instant,
HoodieTimeline timeline) {
- if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- return false;
- }
- try {
- return TimelineUtils.getCommitMetadata(instant,
timeline).getOperationType().equals(WriteOperationType.CLUSTER);
- } catch (IOException e) {
- throw new HoodieException("Resolve replace commit metadata error for
instant: " + instant, e);
- }
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4632b6ecb6e..064e59cc751 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -517,7 +518,7 @@ public class StreamerUtil {
public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant
instant, HoodieTimeline timeline) {
return tableType == HoodieTableType.MERGE_ON_READ
? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a
compaction
- : !ClusteringUtil.isClusteringInstant(instant, timeline); // not a
clustering
+ : !ClusteringUtils.isClusteringInstant(instant, timeline); // not a
clustering
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 29a0326ea41..64211608e05 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -20,12 +20,16 @@ package org.apache.hudi.source;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
@@ -72,67 +76,77 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestIncrementalInputSplits extends HoodieCommonTestHarness {
@BeforeEach
- void init() throws IOException {
+ void init() {
initPath();
- initMetaClient();
}
@Test
- void testFilterInstantsWithRange() {
- HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
+ void testFilterInstantsWithRange() throws IOException {
Configuration conf = TestConfigurations.getDefaultConf(basePath);
conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
- IncrementalInputSplits iis = IncrementalInputSplits.builder()
- .conf(conf)
- .path(new Path(basePath))
- .rowType(TestConfigurations.ROW_TYPE)
-
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
- .build();
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "3");
timeline.createCompleteInstant(commit1);
timeline.createCompleteInstant(commit2);
timeline.createCompleteInstant(commit3);
- timeline = timeline.reload();
+ timeline = metaClient.reloadActiveTimeline();
+
+ Map<String, String> completionTimeMap =
timeline.filterCompletedInstants().getInstantsAsStream()
+ .collect(Collectors.toMap(HoodieInstant::getTimestamp,
HoodieInstant::getCompletionTime));
+ IncrementalQueryAnalyzer analyzer1 = IncrementalQueryAnalyzer.builder()
+ .metaClient(metaClient)
+ .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+ .startTime(completionTimeMap.get("1"))
+ .skipClustering(true)
+ .build();
// previous read iteration read till instant time "1", next read iteration
should return ["2", "3"]
- List<HoodieInstant> instantRange2 = iis.filterInstantsWithRange(timeline,
"1");
- assertEquals(2, instantRange2.size());
- assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2);
+ List<HoodieInstant> activeInstants1 =
analyzer1.analyze().getActiveInstants();
+ assertEquals(2, activeInstants1.size());
+ assertIterableEquals(Arrays.asList(commit2, commit3), activeInstants1);
// simulate first iteration cycle with read from the LATEST commit
- List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline,
null);
- assertEquals(1, instantRange1.size());
- assertIterableEquals(Collections.singletonList(commit3), instantRange1);
+ IncrementalQueryAnalyzer analyzer2 = IncrementalQueryAnalyzer.builder()
+ .metaClient(metaClient)
+ .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+ .skipClustering(true)
+ .build();
+ List<HoodieInstant> activeInstants2 =
analyzer2.analyze().getActiveInstants();
+ assertEquals(1, activeInstants2.size());
+ assertIterableEquals(Collections.singletonList(commit3), activeInstants2);
// specifying a start and end commit
- conf.set(FlinkOptions.READ_START_COMMIT, "1");
- conf.set(FlinkOptions.READ_END_COMMIT, "3");
- List<HoodieInstant> instantRange3 = iis.filterInstantsWithRange(timeline,
null);
- assertEquals(3, instantRange3.size());
- assertIterableEquals(Arrays.asList(commit1, commit2, commit3),
instantRange3);
+ IncrementalQueryAnalyzer analyzer3 = IncrementalQueryAnalyzer.builder()
+ .metaClient(metaClient)
+ .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+ .startTime(completionTimeMap.get("1"))
+ .endTime(completionTimeMap.get("3"))
+ .skipClustering(true)
+ .build();
+ List<HoodieInstant> activeInstants3 =
analyzer3.analyze().getActiveInstants();
+ assertEquals(3, activeInstants3.size());
+ assertIterableEquals(Arrays.asList(commit1, commit2, commit3),
activeInstants3);
// add an inflight instant which should be excluded
HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "4");
timeline.createNewInstant(commit4);
- timeline = timeline.reload();
+ timeline = metaClient.reloadActiveTimeline();
assertEquals(4, timeline.getInstants().size());
- List<HoodieInstant> instantRange4 = iis.filterInstantsWithRange(timeline,
null);
- assertEquals(3, instantRange4.size());
+ List<HoodieInstant> activeInstants4 =
analyzer3.analyze().getActiveInstants();
+ assertEquals(3, activeInstants4.size());
}
@Test
void testFilterInstantsByCondition() throws IOException {
- HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
Configuration conf = TestConfigurations.getDefaultConf(basePath);
- IncrementalInputSplits iis = IncrementalInputSplits.builder()
- .conf(conf)
- .path(new Path(basePath))
- .rowType(TestConfigurations.ROW_TYPE)
- .build();
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "3");
@@ -153,15 +167,16 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
timeline = timeline.reload();
conf.set(FlinkOptions.READ_END_COMMIT, "3");
- HoodieTimeline resTimeline = iis.filterInstantsAsPerUserConfigs(timeline);
+ HoodieTimeline resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timeline,
false, false);
// will not filter cluster commit by default
assertEquals(3, resTimeline.getInstants().size());
}
@Test
void testInputSplitsSortedByPartition() throws Exception {
- HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+
// To enable a full table scan
conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -171,7 +186,7 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
.path(new Path(basePath))
.rowType(TestConfigurations.ROW_TYPE)
.build();
- IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
null, false);
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
false);
List<String> partitions = getFilteredPartitions(result);
assertEquals(Arrays.asList("par1", "par2", "par3", "par4", "par5",
"par6"), partitions);
}
@@ -183,6 +198,8 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
List<String> expectedPartitions) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(basePath);
conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+
List<RowData> testData = new ArrayList<>();
testData.addAll(TestData.DATA_SET_INSERT.stream().collect(Collectors.toList()));
testData.addAll(TestData.DATA_SET_INSERT_PARTITION_IS_NULL.stream().collect(Collectors.toList()));
@@ -199,13 +216,14 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
.rowType(TestConfigurations.ROW_TYPE)
.partitionPruner(partitionPruner)
.build();
- IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
null, false);
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
false);
List<String> partitions = getFilteredPartitions(result);
assertEquals(expectedPartitions, partitions);
}
@Test
void testInputSplitsWithSpeedLimit() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
Configuration conf = TestConfigurations.getDefaultConf(basePath);
conf.set(FlinkOptions.READ_AS_STREAMING, true);
conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
@@ -226,21 +244,22 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
.rowType(TestConfigurations.ROW_TYPE)
.partitionPruner(null)
.build();
- IncrementalInputSplits.Result result = iis.inputSplits(metaClient,
firstInstant.getTimestamp(), firstInstant.getCompletionTime(), false);
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient,
firstInstant.getCompletionTime(), false);
String minStartCommit = result.getInputSplits().stream()
- .map(split -> split.getInstantRange().get().getStartInstant())
+ .map(split ->
split.getInstantRange().get().getStartInstant().get())
.min((commit1,commit2) ->
HoodieTimeline.compareTimestamps(commit1, LESSER_THAN, commit2) ? 1 : 0)
.orElse(null);
String maxEndCommit = result.getInputSplits().stream()
- .map(split -> split.getInstantRange().get().getEndInstant())
+ .map(split -> split.getInstantRange().get().getEndInstant().get())
.max((commit1,commit2) ->
HoodieTimeline.compareTimestamps(commit1, GREATER_THAN, commit2) ? 1 : 0)
.orElse(null);
- assertEquals(1, intervalBetween2Instants(commitsTimeline, minStartCommit,
maxEndCommit));
+ assertEquals(0, intervalBetween2Instants(commitsTimeline, minStartCommit,
maxEndCommit), "Should read 1 instant");
}
@Test
void testInputSplitsForSplitLastCommit() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
Configuration conf = TestConfigurations.getDefaultConf(basePath);
conf.set(FlinkOptions.READ_AS_STREAMING, true);
conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
@@ -271,7 +290,7 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
.rowType(TestConfigurations.ROW_TYPE)
.partitionPruner(null)
.build();
- IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
null, false);
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
false);
result.getInputSplits().stream().filter(split ->
fileIdToBaseInstant.containsKey(split.getFileId()))
.forEach(split ->
assertEquals(fileIdToBaseInstant.get(split.getFileId()),
split.getLatestCommit()));
assertTrue(result.getInputSplits().stream().anyMatch(split ->
split.getLatestCommit().equals(lastInstant)),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 6edeceae0d8..5602ddaa8e3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -415,8 +415,8 @@ public class TestStreamReadMonitoringFunction {
private static boolean isPointInstantRange(InstantRange instantRange, String
timestamp) {
return instantRange != null
- && Objects.equals(timestamp, instantRange.getStartInstant())
- && Objects.equals(timestamp, instantRange.getEndInstant());
+ && Objects.equals(timestamp, instantRange.getStartInstant().get())
+ && Objects.equals(timestamp, instantRange.getEndInstant().get());
}
private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit>
createHarness(
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 5186d68c2cb..b8d0ea6f29e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -356,7 +356,7 @@ public class TestInputFormat {
.build();
// default read the latest commit
- IncrementalInputSplits.Result splits =
incrementalInputSplits.inputSplits(metaClient, null, null, true);
+ IncrementalInputSplits.Result splits =
incrementalInputSplits.inputSplits(metaClient, null, true);
List<RowData> result = readData(inputFormat,
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -398,7 +398,7 @@ public class TestInputFormat {
.build();
// default read the latest commit
- IncrementalInputSplits.Result splits =
incrementalInputSplits.inputSplits(metaClient, null, null, true);
+ IncrementalInputSplits.Result splits =
incrementalInputSplits.inputSplits(metaClient, null, true);
List<RowData> result = readData(inputFormat,
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -438,7 +438,7 @@ public class TestInputFormat {
// default read the latest commit
// the compaction base files are skipped
- IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -454,7 +454,7 @@ public class TestInputFormat {
String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0,
HoodieTimeline.COMMIT_ACTION);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual2 = TestData.rowDataToString(result2);
@@ -470,7 +470,7 @@ public class TestInputFormat {
inputFormat = this.tableSource.getInputFormat(true);
// filter out the last commit by partition pruning
- IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits3.isEmpty());
List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual3 = TestData.rowDataToString(result3);
@@ -503,7 +503,7 @@ public class TestInputFormat {
// default read the latest commit
// the clustering files are skipped
- IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -519,7 +519,7 @@ public class TestInputFormat {
String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0,
HoodieTimeline.REPLACE_COMMIT_ACTION);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual2 = TestData.rowDataToString(result2);
@@ -536,7 +536,7 @@ public class TestInputFormat {
inputFormat = this.tableSource.getInputFormat(true);
// filter out the last commit by partition pruning
- IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits3.isEmpty());
List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual3 = TestData.rowDataToString(result3);
@@ -574,7 +574,9 @@ public class TestInputFormat {
List<HoodieInstant> instants =
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
assertThat(instants.size(), is(2));
- String c4 = instants.get(1).getTimestamp();
+ String c2 = oriInstants.get(1).getTimestamp();
+ String c3 = oriInstants.get(2).getTimestamp();
+ String c4 = oriInstants.get(3).getTimestamp();
InputFormat<RowData, ?> inputFormat =
this.tableSource.getInputFormat(true);
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
@@ -587,7 +589,7 @@ public class TestInputFormat {
// timeline: c1, c2.inflight, c3.inflight, c4
// default read the latest commit
- IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
TestData.assertRowDataEquals(result1, TestData.dataSetInsert(7, 8));
@@ -595,7 +597,7 @@ public class TestInputFormat {
// timeline: c1, c2.inflight, c3.inflight, c4
// -> c1
conf.setString(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2, 7, 8));
@@ -604,13 +606,13 @@ public class TestInputFormat {
// c4 -> c2
TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(1),
metadataList.get(0)); // complete c2
assertThat(splits2.getEndInstant(), is(c4));
- IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, splits2.getEndInstant(),
splits2.getOffset(), false);
+ IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, splits2.getOffset(), false);
assertFalse(splits3.isEmpty());
List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
TestData.assertRowDataEquals(result3, TestData.dataSetInsert(3, 4));
// test c2 and c4, c2 completion time > c1, so it is not a hollow instant
- IncrementalInputSplits.Result splits4 =
incrementalInputSplits.inputSplits(metaClient,
oriInstants.get(0).getTimestamp(), oriInstants.get(0).getCompletionTime(),
false);
+ IncrementalInputSplits.Result splits4 =
incrementalInputSplits.inputSplits(metaClient,
oriInstants.get(0).getCompletionTime(), false);
assertFalse(splits4.isEmpty());
List<RowData> result4 = readData(inputFormat,
splits4.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
TestData.assertRowDataEquals(result4, TestData.dataSetInsert(3, 4, 7, 8));
@@ -618,23 +620,23 @@ public class TestInputFormat {
// timeline: c1, c2, c3, c4
// c4 -> c3
TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(2),
metadataList.get(1)); // complete c3
- assertThat(splits3.getEndInstant(), is(c4));
- IncrementalInputSplits.Result splits5 =
incrementalInputSplits.inputSplits(metaClient, splits3.getEndInstant(),
splits3.getOffset(), false);
+ assertThat(splits3.getEndInstant(), is(c2));
+ IncrementalInputSplits.Result splits5 =
incrementalInputSplits.inputSplits(metaClient, splits3.getOffset(), false);
assertFalse(splits5.isEmpty());
List<RowData> result5 = readData(inputFormat,
splits5.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
TestData.assertRowDataEquals(result5, TestData.dataSetInsert(5, 6));
// c4 ->
- assertThat(splits5.getEndInstant(), is(c4));
- IncrementalInputSplits.Result splits6 =
incrementalInputSplits.inputSplits(metaClient, splits5.getEndInstant(),
splits5.getOffset(), false);
+ assertThat(splits5.getEndInstant(), is(c3));
+ IncrementalInputSplits.Result splits6 =
incrementalInputSplits.inputSplits(metaClient, splits5.getOffset(), false);
assertTrue(splits6.isEmpty());
- // test c2 and c4, c2 is recognized as a hollow instant
+ // test c2, c3 and c4, c2 and c3 is recognized as hollow instants
// the (version_number, completion_time) pair is not consistent, just for
test purpose
- IncrementalInputSplits.Result splits7 =
incrementalInputSplits.inputSplits(metaClient,
oriInstants.get(2).getTimestamp(), oriInstants.get(3).getCompletionTime(),
false);
+ IncrementalInputSplits.Result splits7 =
incrementalInputSplits.inputSplits(metaClient,
oriInstants.get(3).getCompletionTime(), false);
assertFalse(splits7.isEmpty());
List<RowData> result7 = readData(inputFormat,
splits7.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
- TestData.assertRowDataEquals(result7, TestData.dataSetInsert(3, 4, 7, 8));
+ TestData.assertRowDataEquals(result7, TestData.dataSetInsert(3, 4, 5, 6));
}
@Test
@@ -656,7 +658,7 @@ public class TestInputFormat {
.build();
// default read the latest commit
- IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -671,7 +673,7 @@ public class TestInputFormat {
String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1,
HoodieTimeline.COMMIT_ACTION);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual2 = TestData.rowDataToString(result2);
@@ -780,7 +782,7 @@ public class TestInputFormat {
HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(),
HadoopConfigurations.getHadoopConf(conf));
List<String> commits =
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
- .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
assertThat(commits.size(), is(3));
@@ -864,7 +866,7 @@ public class TestInputFormat {
HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(),
HadoopConfigurations.getHadoopConf(conf));
List<String> commits =
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
- .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
assertThat(commits.size(), is(3));
@@ -921,8 +923,9 @@ public class TestInputFormat {
assertThat(inputFormat5, instanceOf(CdcInputFormat.class));
List<RowData> actual5 = readData(inputFormat5);
- final List<RowData> expected5 = TestData.dataSetInsert(1, 2);
- TestData.assertRowDataEquals(actual5, expected5);
+ // even though the start commit "000" is out of range, after instants
filtering,
+ // it detects that all the instants are still active, so the intermediate
changes got returned.
+ TestData.assertRowDataEquals(actual5, expected3);
// start and end commit: both are out of range
conf.setString(FlinkOptions.READ_START_COMMIT, "001");
@@ -1013,12 +1016,12 @@ public class TestInputFormat {
HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(),
HadoopConfigurations.getHadoopConf(conf));
List<String> commits =
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
- .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
assertThat(commits.size(), is(4));
List<String> archivedCommits =
metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants()
-
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
.getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
assertThat(archivedCommits.size(), is(6));
@@ -1137,9 +1140,7 @@ public class TestInputFormat {
assertThat(firstCommit.get().getAction(),
is(HoodieTimeline.DELTA_COMMIT_ACTION));
java.nio.file.Path metaFilePath = Paths.get(metaClient.getMetaPath(),
firstCommit.get().getFileName());
- TestUtils.amendCompletionTimeToLatest(metaClient, metaFilePath,
firstCommit.get().getTimestamp());
-
- String compactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().map(HoodieInstant::getTimestamp).get();
+ String newCompletionTime =
TestUtils.amendCompletionTimeToLatest(metaClient, metaFilePath,
firstCommit.get().getTimestamp());
InputFormat<RowData, ?> inputFormat =
this.tableSource.getInputFormat(true);
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
@@ -1150,8 +1151,8 @@ public class TestInputFormat {
.path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
.skipCompaction(skipCompaction)
.build();
- conf.setString(FlinkOptions.READ_END_COMMIT, compactionInstant);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ conf.setString(FlinkOptions.READ_END_COMMIT, newCompletionTime);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2));
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 611eb889f8d..f4549eb669d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -138,11 +138,13 @@ public class TestUtils {
serializeCommitMetadata(metadata));
}
- public static void amendCompletionTimeToLatest(HoodieTableMetaClient
metaClient, java.nio.file.Path sourcePath, String instantTime) throws
IOException {
+ public static String amendCompletionTimeToLatest(HoodieTableMetaClient
metaClient, java.nio.file.Path sourcePath, String instantTime) throws
IOException {
String fileExt = sourcePath.getFileName().toString().split("\\.")[1];
- String newFileName = instantTime + "_" + metaClient.createNewInstantTime()
+ "." + fileExt;
+ String newCompletionTime = metaClient.createNewInstantTime();
+ String newFileName = instantTime + "_" + newCompletionTime + "." + fileExt;
java.nio.file.Path newFilePath =
sourcePath.getParent().resolve(newFileName);
Files.move(sourcePath, newFilePath);
+ return newCompletionTime;
}
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
index 957dab28e2c..42fd98bdd01 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
@@ -84,6 +84,10 @@ public final class Option<T> implements Serializable {
return null != val;
}
+ public boolean isEmpty() {
+ return null == val;
+ }
+
public T get() {
if (null == val) {
throw new NoSuchElementException("No value present in Option");
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index afccd43ca3e..311383a9c32 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -77,7 +77,7 @@ class CDCRelation(
.startInstant(startInstant)
.endInstant(endInstant)
.nullableBoundary(true)
- .rangeType(InstantRange.RangeType.OPEN_CLOSE).build())
+ .rangeType(InstantRange.RangeType.OPEN_CLOSED).build())
override final def needConversion: Boolean = false
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
index 57633098b25..87d136029e5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
@@ -94,7 +94,7 @@ object LSMTimelineReadBenchmark extends HoodieBenchmarkBase {
}
}
benchmark.addCase("read start time") { _ =>
- new CompletionTimeQueryView(metaClient).getStartTimeSet(startTs + 1 +
1000 + "", startTs + commitsNum + 1000 + "", earliestStartTimeFunc)
+ new CompletionTimeQueryView(metaClient).getStartTimes(startTs + 1 +
1000 + "", startTs + commitsNum + 1000 + "", earliestStartTimeFunc)
}
benchmark.run()
val totalSize =
LSMTimeline.latestSnapshotManifest(metaClient).getFiles.asScala