codope commented on code in PR #10255: URL: https://github.com/apache/hudi/pull/10255#discussion_r1439195459
########## hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java: ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.CompletionTimeQueryView; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Analyzer for incremental queries. + * + * <p>The analyzer can supply info about the incremental queries including: + * <ul> + * <li>The archived instant candidates;</li> + * <li>The active instant candidates;</li> + * <li>The instant filtering predicate, e.g the instant range;</li> + * <li>Whether the query starts from the earliest;</li> + * <li>Whether the query ends to the latest;</li> + * <li>The max completion time used for fs view file slice version filtering.</li> + * </ul> + * + * <p><h2>Criteria for different query ranges:</h2> + * + * <table> + * <tr> + * <th>Query Range</th> + * <th>File Handles Decoding</th> + * <th>Instant Filtering Predicate</th> + * </tr> + * <tr> + * <td>[earliest, _]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>_</td> Review Comment: What is `earliest` here? Is it the earliest instant in the active, or is it including archived? Also, from the doc it is not clear why the instant filtering predicate is empty? ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java: ########## @@ -72,67 +76,77 @@ public class TestIncrementalInputSplits extends HoodieCommonTestHarness { @BeforeEach - void init() throws IOException { + void init() { initPath(); - initMetaClient(); } @Test - void testFilterInstantsWithRange() { - HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true); + void testFilterInstantsWithRange() throws IOException { Configuration conf = TestConfigurations.getDefaultConf(basePath); conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true); - IncrementalInputSplits iis = IncrementalInputSplits.builder() - .conf(conf) - .path(new Path(basePath)) - .rowType(TestConfigurations.ROW_TYPE) - .skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING)) - .build(); + conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "1"); HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "2"); HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "3"); timeline.createCompleteInstant(commit1); timeline.createCompleteInstant(commit2); timeline.createCompleteInstant(commit3); - timeline = timeline.reload(); + timeline = metaClient.reloadActiveTimeline(); + + Map<String, String> completionTimeMap = timeline.filterCompletedInstants().getInstantsAsStream() + .collect(Collectors.toMap(HoodieInstant::getTimestamp, HoodieInstant::getCompletionTime)); + IncrementalQueryAnalyzer analyzer1 = IncrementalQueryAnalyzer.builder() + .metaClient(metaClient) + .rangeType(InstantRange.RangeType.OPEN_CLOSE) + .startTime(completionTimeMap.get("1")) + .skipClustering(true) + .build(); // previous read iteration read till instant time "1", next read iteration should return ["2", "3"] - List<HoodieInstant> instantRange2 = iis.filterInstantsWithRange(timeline, "1"); - assertEquals(2, instantRange2.size()); - assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2); + List<HoodieInstant> activeInstants1 = analyzer1.analyze().getActiveInstants(); + assertEquals(2, activeInstants1.size()); + assertIterableEquals(Arrays.asList(commit2, commit3), activeInstants1); // simulate first iteration cycle with read from the LATEST commit - List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline, null); - assertEquals(1, instantRange1.size()); - assertIterableEquals(Collections.singletonList(commit3), instantRange1); + IncrementalQueryAnalyzer analyzer2 = IncrementalQueryAnalyzer.builder() + .metaClient(metaClient) + .rangeType(InstantRange.RangeType.CLOSE_CLOSE) + .skipClustering(true) + .build(); + List<HoodieInstant> activeInstants2 = analyzer2.analyze().getActiveInstants(); + assertEquals(1, activeInstants2.size()); + assertIterableEquals(Collections.singletonList(commit3), activeInstants2); // specifying a start and end commit - conf.set(FlinkOptions.READ_START_COMMIT, "1"); - conf.set(FlinkOptions.READ_END_COMMIT, "3"); - List<HoodieInstant> instantRange3 = iis.filterInstantsWithRange(timeline, null); - assertEquals(3, instantRange3.size()); - assertIterableEquals(Arrays.asList(commit1, commit2, commit3), instantRange3); + IncrementalQueryAnalyzer analyzer3 = IncrementalQueryAnalyzer.builder() Review Comment: let's also test for null/emoty start and end time. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java: ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.CompletionTimeQueryView; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Analyzer for incremental queries. + * + * <p>The analyzer can supply info about the incremental queries including: + * <ul> + * <li>The archived instant candidates;</li> + * <li>The active instant candidates;</li> + * <li>The instant filtering predicate, e.g the instant range;</li> + * <li>Whether the query starts from the earliest;</li> + * <li>Whether the query ends to the latest;</li> + * <li>The max completion time used for fs view file slice version filtering.</li> + * </ul> + * + * <p><h2>Criteria for different query ranges:</h2> + * + * <table> + * <tr> + * <th>Query Range</th> + * <th>File Handles Decoding</th> + * <th>Instant Filtering Predicate</th> + * </tr> + * <tr> + * <td>[earliest, _]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>_</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>'_hoodie_commit_time' in setA, setA is a collection of all the instants completed before or on 'endTime'</td> + * </tr> + * <tr> + * <td>[_, _]</td> + * <td>The latest completed instant metadata</td> + * <td>'_hoodie_commit_time' = i_n, i_n is the latest completed instant</td> + * </tr> + * <tr> + * <td>[_, endTime]</td> + * <td>i).find the last completed instant i_n before or on 'endTim; + * ii). read the latest snapshot from table metadata if i_n is archived or the commit metadata if it is still active</td> + * <td>'_hoodie_commit_time' = i_n</td> + * </tr> + * <tr> + * <td>[startTime, _]</td> + * <td>i).find the instant set setA, setA is a collection of all the instants completed before or on 'endTime'; + * ii). read the latest snapshot from table metadata if setA has archived instants or the commit metadata if all the instants are still active</td> + * <td>'_hoodie_commit_time' in setA</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>i).find the instant set setA, setA is a collection of all the instants completed in the given time range; + * ii). read the latest snapshot from table metadata if setA has archived instants or the commit metadata if all the instants are still active</td> + * <td>'_hoodie_commit_time' in setA</td> + * </tr> + * </table> + * + * <p> A range type is required for analyzing the query so that the query range boundary inclusiveness have clear semantics. + * + * <p>IMPORTANT: the reader may optionally choose to fall back to reading the latest snapshot if there are files missing from decoding the commit metadata. + */ +public class IncrementalQueryAnalyzer { + public static final String START_COMMIT_EARLIEST = "earliest"; + + private final HoodieTableMetaClient metaClient; + private final Option<String> startTime; + private final Option<String> endTime; + private final InstantRange.RangeType rangeType; + private final boolean skipCompaction; + private final boolean skipClustering; + private final int limit; + + private IncrementalQueryAnalyzer( + HoodieTableMetaClient metaClient, + String startTime, + String endTime, + InstantRange.RangeType rangeType, + boolean skipCompaction, + boolean skipClustering, + int limit) { + this.metaClient = metaClient; + this.startTime = Option.ofNullable(startTime); + this.endTime = Option.ofNullable(endTime); + this.rangeType = rangeType; + this.skipCompaction = skipCompaction; + this.skipClustering = skipClustering; + this.limit = limit; + } + + /** + * Returns a builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Analyzes the incremental query context with given completion time range. + * + * @return An incremental query context including the instant time range info. + */ + public QueryContext analyze() { + try (CompletionTimeQueryView completionTimeQueryView = new CompletionTimeQueryView(this.metaClient)) { + if (completionTimeQueryView.isEmptyTable()) { + // no dataset committed in the table + return QueryContext.EMPTY; + } + HoodieTimeline readTimeline = getReadTimeline(this.metaClient); + List<String> instantTimeList = completionTimeQueryView.getStartTimes(readTimeline, startTime, endTime, rangeType); + if (instantTimeList.isEmpty()) { + // no instants completed within the give time range, returns early. + return QueryContext.EMPTY; + } + // get hoodie instants + Pair<List<String>, List<String>> splitInstantTime = splitInstantByActiveness(instantTimeList, completionTimeQueryView); + Set<String> instantTimeSet = new HashSet<>(instantTimeList); + List<String> archivedInstantTime = splitInstantTime.getKey(); + List<String> activeInstantTime = splitInstantTime.getValue(); + List<HoodieInstant> archivedInstants = new ArrayList<>(); + List<HoodieInstant> activeInstants = new ArrayList<>(); + HoodieTimeline archivedReadTimeline = null; + if (!activeInstantTime.isEmpty()) { + activeInstants = readTimeline.getInstantsAsStream().filter(instant -> instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList()); + if (limit > 0 && limit < activeInstants.size()) { + // streaming read speed limit, limits the maximum number of commits allowed to read for each run + activeInstants = activeInstants.subList(0, limit); + } + } + if (!archivedInstantTime.isEmpty()) { + archivedReadTimeline = getArchivedReadTimeline(metaClient, archivedInstantTime.get(0)); + archivedInstants = archivedReadTimeline.getInstantsAsStream().filter(instant -> instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList()); + } + List<String> instants = Stream.concat(archivedInstants.stream(), activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + if (instants.isEmpty()) { + // no instants completed within the give time range, returns early. + return QueryContext.EMPTY; + } + if (startTime.isEmpty() && endTime.isPresent()) { + instants = Collections.singletonList(instants.get(instants.size() - 1)); + } + String lastInstant = instants.get(instants.size() - 1); + // keep the same semantics with streaming read, default start from the latest commit + String startInstant = START_COMMIT_EARLIEST.equalsIgnoreCase(startTime.orElse(null)) ? null : startTime.isEmpty() ? lastInstant : instants.get(0); + String endInstant = endTime.isEmpty() ? null : lastInstant; + return QueryContext.create(startInstant, endInstant, instants, archivedInstants, activeInstants, readTimeline, archivedReadTimeline); + } + } + + /** + * Splits the given instant time list into a pair of archived instant list and active instant list. + */ + private static Pair<List<String>, List<String>> splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView completionTimeQueryView) { + int firstActiveIdx = IntStream.range(0, instantTimeList.size()).filter(i -> !completionTimeQueryView.isArchived(instantTimeList.get(i))).findFirst().orElse(-1); + if (firstActiveIdx == -1) { + return Pair.of(instantTimeList, Collections.emptyList()); + } else if (firstActiveIdx == 0) { + return Pair.of(Collections.emptyList(), instantTimeList); + } else { + return Pair.of(instantTimeList.subList(0, firstActiveIdx), instantTimeList.subList(firstActiveIdx, instantTimeList.size())); + } + } + + private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) { + HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); + return filterInstantsAsPerUserConfigs(metaClient, timeline, this.skipCompaction, this.skipClustering); + } + + private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) { + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant, false); + HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); + return filterInstantsAsPerUserConfigs(metaClient, archivedCompleteTimeline, this.skipCompaction, this.skipClustering); + } + + /** + * Filters out the unnecessary instants as per user specified configs. + * + * @param timeline The timeline. + * + * @return the filtered timeline + */ + @VisibleForTesting + public static HoodieTimeline filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline timeline, boolean skipCompaction, boolean skipClustering) { + final HoodieTimeline oriTimeline = timeline; + if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & skipCompaction) { + // the compaction commit uses 'commit' as action which is tricky + timeline = timeline.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)); + } + if (skipClustering) { + timeline = timeline.filter(instant -> !ClusteringUtils.isClusteringInstant(instant, oriTimeline)); + } + return timeline; + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + /** + * Builder for {@link IncrementalQueryAnalyzer}. + */ + public static class Builder { + /** + * Start completion time. + */ + private String startTime; + /** + * End completion time. + */ + private String endTime; + private InstantRange.RangeType rangeType; + private HoodieTableMetaClient metaClient; + private boolean skipCompaction = false; + private boolean skipClustering = false; + /** + * Maximum number of instants to read per run. + */ + private int limit = -1; + + public Builder() { + } + + public Builder startTime(String startTime) { + this.startTime = startTime; + return this; + } + + public Builder endTime(String endTime) { + this.endTime = endTime; + return this; + } + + public Builder rangeType(InstantRange.RangeType rangeType) { + this.rangeType = rangeType; + return this; + } + + public Builder metaClient(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + return this; + } + + public Builder skipCompaction(boolean skipCompaction) { + this.skipCompaction = skipCompaction; + return this; + } + + public Builder skipClustering(boolean skipClustering) { + this.skipClustering = skipClustering; + return this; + } + + public Builder limit(int limit) { + this.limit = limit; + return this; + } + + public IncrementalQueryAnalyzer build() { + return new IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient), this.startTime, this.endTime, + Objects.requireNonNull(this.rangeType), this.skipCompaction, this.skipClustering, this.limit); + } + } + + /** + * Represents the analyzed query context. + */ + public static class QueryContext { + public static final QueryContext EMPTY = new QueryContext(null, null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null, null); + + /** + * An empty option indicates consumption from the earliest instant. + */ + private final Option<String> startInstant; + /** + * An empty option indicates consumption to the latest instant. + */ + private final Option<String> endInstant; + private final List<HoodieInstant> archivedInstants; + private final List<HoodieInstant> activeInstants; + /** + * The active timeline to read filtered by given configurations. + */ + private final HoodieTimeline readTimeline; + /** + * The archived timeline to read filtered by given configurations. + */ + private final HoodieTimeline archivedReadTimeline; Review Comment: `activeTimelineFiltered` and `archivedTimelineFiltered` sounds more intuitive. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.CompletionTimeQueryView; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Analyzer for incremental queries. + * + * <p>The analyzer can supply info about the incremental queries including: + * <ul> + * <li>The archived instant candidates;</li> + * <li>The active instant candidates;</li> + * <li>The instant filtering predicate, e.g the instant range;</li> + * <li>Whether the query starts from the earliest;</li> + * <li>Whether the query ends to the latest;</li> + * <li>The max completion time used for fs view file slice version filtering.</li> + * </ul> + * + * <p><h2>Criteria for different query ranges:</h2> + * + * <table> + * <tr> + * <th>Query Range</th> + * <th>File Handles Decoding</th> + * <th>Instant Filtering Predicate</th> + * </tr> + * <tr> + * <td>[earliest, _]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>_</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>'_hoodie_commit_time' in setA, setA is a collection of all the instants completed before or on 'endTime'</td> + * </tr> + * <tr> + * <td>[_, _]</td> + * <td>The latest completed instant metadata</td> + * <td>'_hoodie_commit_time' = i_n, i_n is the latest completed instant</td> + * </tr> + * <tr> + * <td>[_, endTime]</td> + * <td>i).find the last completed instant i_n before or on 'endTim; + * ii). read the latest snapshot from table metadata if i_n is archived or the commit metadata if it is still active</td> Review Comment: +1. Please file a followup and link it here. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java: ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.CompletionTimeQueryView; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Analyzer for incremental queries. + * + * <p>The analyzer can supply info about the incremental queries including: + * <ul> + * <li>The archived instant candidates;</li> + * <li>The active instant candidates;</li> + * <li>The instant filtering predicate, e.g the instant range;</li> + * <li>Whether the query starts from the earliest;</li> + * <li>Whether the query ends to the latest;</li> + * <li>The max completion time used for fs view file slice version filtering.</li> + * </ul> + * + * <p><h2>Criteria for different query ranges:</h2> + * + * <table> + * <tr> + * <th>Query Range</th> + * <th>File Handles Decoding</th> + * <th>Instant Filtering Predicate</th> + * </tr> + * <tr> + * <td>[earliest, _]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>_</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>'_hoodie_commit_time' in setA, setA is a collection of all the instants completed before or on 'endTime'</td> + * </tr> + * <tr> + * <td>[_, _]</td> + * <td>The latest completed instant metadata</td> + * <td>'_hoodie_commit_time' = i_n, i_n is the latest completed instant</td> + * </tr> + * <tr> + * <td>[_, endTime]</td> + * <td>i).find the last completed instant i_n before or on 'endTim; + * ii). read the latest snapshot from table metadata if i_n is archived or the commit metadata if it is still active</td> + * <td>'_hoodie_commit_time' = i_n</td> + * </tr> + * <tr> + * <td>[startTime, _]</td> + * <td>i).find the instant set setA, setA is a collection of all the instants completed before or on 'endTime'; + * ii). read the latest snapshot from table metadata if setA has archived instants or the commit metadata if all the instants are still active</td> + * <td>'_hoodie_commit_time' in setA</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>i).find the instant set setA, setA is a collection of all the instants completed in the given time range; + * ii). read the latest snapshot from table metadata if setA has archived instants or the commit metadata if all the instants are still active</td> + * <td>'_hoodie_commit_time' in setA</td> + * </tr> + * </table> + * + * <p> A range type is required for analyzing the query so that the query range boundary inclusiveness have clear semantics. + * + * <p>IMPORTANT: the reader may optionally choose to fall back to reading the latest snapshot if there are files missing from decoding the commit metadata. + */ +public class IncrementalQueryAnalyzer { + public static final String START_COMMIT_EARLIEST = "earliest"; + + private final HoodieTableMetaClient metaClient; + private final Option<String> startTime; + private final Option<String> endTime; + private final InstantRange.RangeType rangeType; + private final boolean skipCompaction; + private final boolean skipClustering; + private final int limit; + + private IncrementalQueryAnalyzer( + HoodieTableMetaClient metaClient, + String startTime, + String endTime, + InstantRange.RangeType rangeType, + boolean skipCompaction, + boolean skipClustering, + int limit) { + this.metaClient = metaClient; + this.startTime = Option.ofNullable(startTime); + this.endTime = Option.ofNullable(endTime); + this.rangeType = rangeType; + this.skipCompaction = skipCompaction; + this.skipClustering = skipClustering; + this.limit = limit; + } + + /** + * Returns a builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Analyzes the incremental query context with given completion time range. + * + * @return An incremental query context including the instant time range info. + */ + public QueryContext analyze() { + try (CompletionTimeQueryView completionTimeQueryView = new CompletionTimeQueryView(this.metaClient)) { + if (completionTimeQueryView.isEmptyTable()) { + // no dataset committed in the table + return QueryContext.EMPTY; + } + HoodieTimeline readTimeline = getReadTimeline(this.metaClient); + List<String> instantTimeList = completionTimeQueryView.getStartTimes(readTimeline, startTime, endTime, rangeType); + if (instantTimeList.isEmpty()) { + // no instants completed within the give time range, returns early. + return QueryContext.EMPTY; + } + // get hoodie instants + Pair<List<String>, List<String>> splitInstantTime = splitInstantByActiveness(instantTimeList, completionTimeQueryView); + Set<String> instantTimeSet = new HashSet<>(instantTimeList); + List<String> archivedInstantTime = splitInstantTime.getKey(); + List<String> activeInstantTime = splitInstantTime.getValue(); + List<HoodieInstant> archivedInstants = new ArrayList<>(); + List<HoodieInstant> activeInstants = new ArrayList<>(); + HoodieTimeline archivedReadTimeline = null; + if (!activeInstantTime.isEmpty()) { + activeInstants = readTimeline.getInstantsAsStream().filter(instant -> instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList()); + if (limit > 0 && limit < activeInstants.size()) { + // streaming read speed limit, limits the maximum number of commits allowed to read for each run + activeInstants = activeInstants.subList(0, limit); + } + } + if (!archivedInstantTime.isEmpty()) { + archivedReadTimeline = getArchivedReadTimeline(metaClient, archivedInstantTime.get(0)); + archivedInstants = archivedReadTimeline.getInstantsAsStream().filter(instant -> instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList()); + } + List<String> instants = Stream.concat(archivedInstants.stream(), activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + if (instants.isEmpty()) { + // no instants completed within the give time range, returns early. + return QueryContext.EMPTY; + } + if (startTime.isEmpty() && endTime.isPresent()) { + instants = Collections.singletonList(instants.get(instants.size() - 1)); + } + String lastInstant = instants.get(instants.size() - 1); + // keep the same semantics with streaming read, default start from the latest commit + String startInstant = START_COMMIT_EARLIEST.equalsIgnoreCase(startTime.orElse(null)) ? null : startTime.isEmpty() ? lastInstant : instants.get(0); + String endInstant = endTime.isEmpty() ? null : lastInstant; + return QueryContext.create(startInstant, endInstant, instants, archivedInstants, activeInstants, readTimeline, archivedReadTimeline); + } + } + + /** + * Splits the given instant time list into a pair of archived instant list and active instant list. + */ + private static Pair<List<String>, List<String>> splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView completionTimeQueryView) { Review Comment: `Pair` implements `Map`. I think for this use case, a simple POJO would suffice. Wdyt? ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.CompletionTimeQueryView; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Analyzer for incremental queries. + * + * <p>The analyzer can supply info about the incremental queries including: + * <ul> + * <li>The archived instant candidates;</li> + * <li>The active instant candidates;</li> + * <li>The instant filtering predicate, e.g the instant range;</li> + * <li>Whether the query starts from the earliest;</li> + * <li>Whether the query ends to the latest;</li> + * <li>The max completion time used for fs view file slice version filtering.</li> + * </ul> + * + * <p><h2>Criteria for different query ranges:</h2> + * + * <table> + * <tr> + * <th>Query Range</th> + * <th>File Handles Decoding</th> + * <th>Instant Filtering Predicate</th> + * </tr> + * <tr> + * <td>[earliest, _]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>_</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>'_hoodie_commit_time' in setA, setA is a collection of all the instants completed before or on 'endTime'</td> + * </tr> + * <tr> + * <td>[_, _]</td> Review Comment: When can it happen that both startTime and endTime are null/empty? Also, since this is a range, it looks slightly confusing as a user. I mean if startTime is null/empty then should we not treat the earliest instant as the start of the range? Just like if endTime is null/emoty then we pick latest instant as the end of the range. ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java: ########## @@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String startTime) { * * <p>By default, assumes there is at most 1 day time of duration for an instant to accelerate the queries. * - * @param startCompletionTime The start completion time. - * @param endCompletionTime The end completion time. + * @param readTimeline The read timeline. Review Comment: let's rename to `activeTimelineFiltered`? `readTimeline` introduces ambiguity such as there is a writeTimeline or not. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java: ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.CompletionTimeQueryView; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Analyzer for incremental queries. + * + * <p>The analyzer can supply info about the incremental queries including: + * <ul> + * <li>The archived instant candidates;</li> + * <li>The active instant candidates;</li> + * <li>The instant filtering predicate, e.g the instant range;</li> + * <li>Whether the query starts from the earliest;</li> + * <li>Whether the query ends to the latest;</li> + * <li>The max completion time used for fs view file slice version filtering.</li> + * </ul> + * + * <p><h2>Criteria for different query ranges:</h2> + * + * <table> + * <tr> + * <th>Query Range</th> + * <th>File Handles Decoding</th> + * <th>Instant Filtering Predicate</th> + * </tr> + * <tr> + * <td>[earliest, _]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>_</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>'_hoodie_commit_time' in setA, setA is a collection of all the instants completed before or on 'endTime'</td> + * </tr> + * <tr> + * <td>[_, _]</td> + * <td>The latest completed instant metadata</td> + * <td>'_hoodie_commit_time' = i_n, i_n is the latest completed instant</td> + * </tr> + * <tr> + * <td>[_, endTime]</td> + * <td>i).find the last completed instant i_n before or on 'endTim; + * ii). read the latest snapshot from table metadata if i_n is archived or the commit metadata if it is still active</td> + * <td>'_hoodie_commit_time' = i_n</td> + * </tr> + * <tr> + * <td>[startTime, _]</td> + * <td>i).find the instant set setA, setA is a collection of all the instants completed before or on 'endTime'; Review Comment: Not following this doc. I think setA should be a collection of all instants the completed on or after `startTime` right? In `[startTime, _]` endTime is not defined so it's basically the latest instant. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java: ########## @@ -516,7 +517,7 @@ public static boolean fileExists(FileSystem fs, Path path) { public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant instant, HoodieTimeline timeline) { return tableType == HoodieTableType.MERGE_ON_READ ? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a compaction - : !ClusteringUtil.isClusteringInstant(instant, timeline); // not a clustering + : !ClusteringUtils.isClusteringInstant(instant, timeline); // not a clustering Review Comment: let's merge the methods of `ClusteringUtil` in hudi-flink to `ClusteringUtils` in hudi-common. Alternatively, rename `ClusteringUtil` to `FlinkClusteringUtil` ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java: ########## @@ -135,44 +127,37 @@ public static Builder builder() { public Result inputSplits( HoodieTableMetaClient metaClient, boolean cdcEnabled) { - HoodieTimeline commitTimeline = getReadTimeline(metaClient); - if (commitTimeline.empty()) { - LOG.warn("No splits found for the table under path " + path); + + IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder() + .metaClient(metaClient) + .startTime(this.conf.getString(FlinkOptions.READ_START_COMMIT)) + .endTime(this.conf.getString(FlinkOptions.READ_END_COMMIT)) Review Comment: When is it possible that both READ_START_COMMIT and READ_END_COMMIT are null/empty? ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java: ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.CompletionTimeQueryView; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Analyzer for incremental queries. + * + * <p>The analyzer can supply info about the incremental queries including: + * <ul> + * <li>The archived instant candidates;</li> + * <li>The active instant candidates;</li> + * <li>The instant filtering predicate, e.g the instant range;</li> + * <li>Whether the query starts from the earliest;</li> + * <li>Whether the query ends to the latest;</li> + * <li>The max completion time used for fs view file slice version filtering.</li> + * </ul> + * + * <p><h2>Criteria for different query ranges:</h2> + * + * <table> + * <tr> + * <th>Query Range</th> + * <th>File Handles Decoding</th> + * <th>Instant Filtering Predicate</th> + * </tr> + * <tr> + * <td>[earliest, _]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>_</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>The latest snapshot files from table metadata</td> + * <td>'_hoodie_commit_time' in setA, setA is a collection of all the instants completed before or on 'endTime'</td> + * </tr> + * <tr> + * <td>[_, _]</td> + * <td>The latest completed instant metadata</td> + * <td>'_hoodie_commit_time' = i_n, i_n is the latest completed instant</td> + * </tr> + * <tr> + * <td>[_, endTime]</td> + * <td>i).find the last completed instant i_n before or on 'endTim; + * ii). read the latest snapshot from table metadata if i_n is archived or the commit metadata if it is still active</td> + * <td>'_hoodie_commit_time' = i_n</td> + * </tr> + * <tr> + * <td>[startTime, _]</td> + * <td>i).find the instant set setA, setA is a collection of all the instants completed before or on 'endTime'; + * ii). read the latest snapshot from table metadata if setA has archived instants or the commit metadata if all the instants are still active</td> + * <td>'_hoodie_commit_time' in setA</td> + * </tr> + * <tr> + * <td>[earliest, endTime]</td> + * <td>i).find the instant set setA, setA is a collection of all the instants completed in the given time range; + * ii). read the latest snapshot from table metadata if setA has archived instants or the commit metadata if all the instants are still active</td> + * <td>'_hoodie_commit_time' in setA</td> + * </tr> + * </table> + * + * <p> A range type is required for analyzing the query so that the query range boundary inclusiveness have clear semantics. + * + * <p>IMPORTANT: the reader may optionally choose to fall back to reading the latest snapshot if there are files missing from decoding the commit metadata. + */ +public class IncrementalQueryAnalyzer { + public static final String START_COMMIT_EARLIEST = "earliest"; + + private final HoodieTableMetaClient metaClient; + private final Option<String> startTime; + private final Option<String> endTime; + private final InstantRange.RangeType rangeType; + private final boolean skipCompaction; + private final boolean skipClustering; + private final int limit; + + private IncrementalQueryAnalyzer( + HoodieTableMetaClient metaClient, + String startTime, + String endTime, + InstantRange.RangeType rangeType, + boolean skipCompaction, + boolean skipClustering, + int limit) { + this.metaClient = metaClient; + this.startTime = Option.ofNullable(startTime); + this.endTime = Option.ofNullable(endTime); + this.rangeType = rangeType; + this.skipCompaction = skipCompaction; + this.skipClustering = skipClustering; + this.limit = limit; + } + + /** + * Returns a builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Analyzes the incremental query context with given completion time range. + * + * @return An incremental query context including the instant time range info. + */ + public QueryContext analyze() { + try (CompletionTimeQueryView completionTimeQueryView = new CompletionTimeQueryView(this.metaClient)) { + if (completionTimeQueryView.isEmptyTable()) { + // no dataset committed in the table + return QueryContext.EMPTY; + } + HoodieTimeline readTimeline = getReadTimeline(this.metaClient); + List<String> instantTimeList = completionTimeQueryView.getStartTimes(readTimeline, startTime, endTime, rangeType); + if (instantTimeList.isEmpty()) { + // no instants completed within the give time range, returns early. + return QueryContext.EMPTY; + } + // get hoodie instants + Pair<List<String>, List<String>> splitInstantTime = splitInstantByActiveness(instantTimeList, completionTimeQueryView); + Set<String> instantTimeSet = new HashSet<>(instantTimeList); + List<String> archivedInstantTime = splitInstantTime.getKey(); + List<String> activeInstantTime = splitInstantTime.getValue(); + List<HoodieInstant> archivedInstants = new ArrayList<>(); + List<HoodieInstant> activeInstants = new ArrayList<>(); + HoodieTimeline archivedReadTimeline = null; + if (!activeInstantTime.isEmpty()) { + activeInstants = readTimeline.getInstantsAsStream().filter(instant -> instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList()); + if (limit > 0 && limit < activeInstants.size()) { + // streaming read speed limit, limits the maximum number of commits allowed to read for each run + activeInstants = activeInstants.subList(0, limit); + } + } + if (!archivedInstantTime.isEmpty()) { + archivedReadTimeline = getArchivedReadTimeline(metaClient, archivedInstantTime.get(0)); + archivedInstants = archivedReadTimeline.getInstantsAsStream().filter(instant -> instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList()); + } + List<String> instants = Stream.concat(archivedInstants.stream(), activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + if (instants.isEmpty()) { + // no instants completed within the give time range, returns early. + return QueryContext.EMPTY; + } + if (startTime.isEmpty() && endTime.isPresent()) { Review Comment: Where do we handle both startTime and endTime being empty? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java: ########## @@ -175,42 +187,111 @@ public Option<String> getCompletionTime(String startTime) { * * <p>By default, assumes there is at most 1 day time of duration for an instant to accelerate the queries. * - * @param startCompletionTime The start completion time. - * @param endCompletionTime The end completion time. + * @param readTimeline The read timeline. + * @param rangeStart The query range start completion time. + * @param rangeEnd The query range end completion time. + * @param rangeType The range type. * - * @return The instant time set. + * @return The sorted instant time list. */ - public Set<String> getStartTimeSet(String startCompletionTime, String endCompletionTime) { + public List<String> getStartTimes( + HoodieTimeline readTimeline, + Option<String> rangeStart, + Option<String> rangeEnd, + InstantRange.RangeType rangeType) { // assumes any instant/transaction lasts at most 1 day to optimize the query efficiency. - return getStartTimeSet(startCompletionTime, endCompletionTime, s -> HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY)); + return getStartTimes(readTimeline, rangeStart, rangeEnd, rangeType, s -> HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY)); } /** * Queries the instant start time with given completion time range. * - * @param startCompletionTime The start completion time. - * @param endCompletionTime The end completion time. - * @param earliestStartTimeFunc The function to generate the earliest start time boundary - * with the minimum completion time {@code startCompletionTime}. + * @param rangeStart The query range start completion time. + * @param rangeEnd The query range end completion time. + * @param earliestInstantTimeFunc The function to generate the earliest start time boundary + * with the minimum completion time. * - * @return The instant time set. + * @return The sorted instant time list. */ - public Set<String> getStartTimeSet(String startCompletionTime, String endCompletionTime, Function<String, String> earliestStartTimeFunc) { - String startInstant = earliestStartTimeFunc.apply(startCompletionTime); + @VisibleForTesting + public List<String> getStartTimes( + String rangeStart, + String rangeEnd, + Function<String, String> earliestInstantTimeFunc) { + return getStartTimes(metaClient.getCommitsTimeline().filterCompletedInstants(), Option.ofNullable(rangeStart), Option.ofNullable(rangeEnd), + InstantRange.RangeType.CLOSE_CLOSE, earliestInstantTimeFunc); + } + + /** + * Queries the instant start time with given completion time range. + * + * @param readTimeline The read timeline. + * @param rangeStart The query range start completion time. + * @param rangeEnd The query range end completion time. + * @param rangeType The range type. + * @param earliestInstantTimeFunc The function to generate the earliest start time boundary + * with the minimum completion time. + * + * @return The sorted instant time list. + */ + public List<String> getStartTimes( Review Comment: This is confusing with `startTime` and `endTime` in `IncrementalQueryAnalyzer`. Let's assume each instant has start timestamp and completion timestamp. Then, we can rename this method to `getInstantStartTimestamps`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
