This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6f117f22c23 [HUDI-7230] Flink stream read supports skipping insert
overwrite instant (#10328)
6f117f22c23 is described below
commit 6f117f22c233e7768b63d7e08bd223f8d9cf80d7
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Mon Apr 8 17:22:22 2024 +0800
[HUDI-7230] Flink stream read supports skipping insert overwrite instant
(#10328)
---
.../table/read/IncrementalQueryAnalyzer.java | 20 ++-
.../apache/hudi/common/util/ClusteringUtils.java | 15 ++
.../apache/hudi/configuration/FlinkOptions.java | 8 ++
.../apache/hudi/source/IncrementalInputSplits.java | 17 ++-
.../hudi/source/StreamReadMonitoringFunction.java | 1 +
.../hudi/source/TestIncrementalInputSplits.java | 151 +++++++++++++++++++--
6 files changed, 194 insertions(+), 18 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index 3f0eb32c7e5..ca8ae575898 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -111,6 +111,7 @@ public class IncrementalQueryAnalyzer {
private final InstantRange.RangeType rangeType;
private final boolean skipCompaction;
private final boolean skipClustering;
+ private final boolean skipInsertOverwrite;
private final int limit;
private IncrementalQueryAnalyzer(
@@ -120,6 +121,7 @@ public class IncrementalQueryAnalyzer {
InstantRange.RangeType rangeType,
boolean skipCompaction,
boolean skipClustering,
+ boolean skipInsertOverwrite,
int limit) {
this.metaClient = metaClient;
this.startTime = Option.ofNullable(startTime);
@@ -127,6 +129,7 @@ public class IncrementalQueryAnalyzer {
this.rangeType = rangeType;
this.skipCompaction = skipCompaction;
this.skipClustering = skipClustering;
+ this.skipInsertOverwrite = skipInsertOverwrite;
this.limit = limit;
}
@@ -206,13 +209,13 @@ public class IncrementalQueryAnalyzer {
private HoodieTimeline getFilteredTimeline(HoodieTableMetaClient metaClient)
{
HoodieTimeline timeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
- return filterInstantsAsPerUserConfigs(metaClient, timeline,
this.skipCompaction, this.skipClustering);
+ return filterInstantsAsPerUserConfigs(metaClient, timeline,
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite);
}
private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient
metaClient, String startInstant) {
HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(startInstant, false);
HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
- return filterInstantsAsPerUserConfigs(metaClient,
archivedCompleteTimeline, this.skipCompaction, this.skipClustering);
+ return filterInstantsAsPerUserConfigs(metaClient,
archivedCompleteTimeline, this.skipCompaction, this.skipClustering,
this.skipInsertOverwrite);
}
/**
@@ -223,7 +226,7 @@ public class IncrementalQueryAnalyzer {
* @return the filtered timeline
*/
@VisibleForTesting
- public static HoodieTimeline
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline
timeline, boolean skipCompaction, boolean skipClustering) {
+ public static HoodieTimeline
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline
timeline, boolean skipCompaction, boolean skipClustering, boolean
skipInsertOverwrite) {
final HoodieTimeline oriTimeline = timeline;
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ &
skipCompaction) {
// the compaction commit uses 'commit' as action which is tricky
@@ -232,6 +235,9 @@ public class IncrementalQueryAnalyzer {
if (skipClustering) {
timeline = timeline.filter(instant ->
!ClusteringUtils.isCompletedClusteringInstant(instant, oriTimeline));
}
+ if (skipInsertOverwrite) {
+ timeline = timeline.filter(instant ->
!ClusteringUtils.isInsertOverwriteInstant(instant, oriTimeline));
+ }
return timeline;
}
@@ -254,6 +260,7 @@ public class IncrementalQueryAnalyzer {
private HoodieTableMetaClient metaClient;
private boolean skipCompaction = false;
private boolean skipClustering = false;
+ private boolean skipInsertOverwrite = false;
/**
* Maximum number of instants to read per run.
*/
@@ -292,6 +299,11 @@ public class IncrementalQueryAnalyzer {
return this;
}
+ public Builder skipInsertOverwrite(boolean skipInsertOverwrite) {
+ this.skipInsertOverwrite = skipInsertOverwrite;
+ return this;
+ }
+
public Builder limit(int limit) {
this.limit = limit;
return this;
@@ -299,7 +311,7 @@ public class IncrementalQueryAnalyzer {
public IncrementalQueryAnalyzer build() {
return new
IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient),
this.startTime, this.endTime,
- Objects.requireNonNull(this.rangeType), this.skipCompaction,
this.skipClustering, this.limit);
+ Objects.requireNonNull(this.rangeType), this.skipCompaction,
this.skipClustering, this.skipInsertOverwrite, this.limit);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index d041b6bcb8f..64eb27453b3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -314,4 +314,19 @@ public class ClusteringUtils {
throw new HoodieException("Resolve replace commit metadata error for
instant: " + instant, e);
}
}
+
+ /**
+ * Returns whether the given instant {@code instant} is with insert
overwrite operation.
+ */
+ public static boolean isInsertOverwriteInstant(HoodieInstant instant,
HoodieTimeline timeline) {
+ if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return false;
+ }
+ try {
+ WriteOperationType opType = TimelineUtils.getCommitMetadata(instant,
timeline).getOperationType();
+ return opType.equals(WriteOperationType.INSERT_OVERWRITE) ||
opType.equals(WriteOperationType.INSERT_OVERWRITE_TABLE);
+ } catch (IOException e) {
+ throw new HoodieException("Resolve replace commit metadata error for
instant: " + instant, e);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9a85a46e485..7565ad15300 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -329,6 +329,14 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Whether to skip clustering instants to avoid reading
base files of clustering operations for streaming read "
+ "to improve read performance.");
+ // this option is experimental
+ public static final ConfigOption<Boolean>
READ_STREAMING_SKIP_INSERT_OVERWRITE = ConfigOptions
+ .key("read.streaming.skip_insertoverwrite")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to skip insert overwrite instants to avoid
reading base files of insert overwrite operations for streaming read. "
+ + "In streaming scenarios, insert overwrite is usually used to
repair data, here you can control the visibility of downstream streaming
read.");
+
public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
.key("read.start-commit")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 2b4dec9995c..ddd7fbbb0a8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -91,6 +91,8 @@ public class IncrementalInputSplits implements Serializable {
private final boolean skipCompaction;
// skip clustering
private final boolean skipClustering;
+ // skip insert overwrite
+ private final boolean skipInsertOverwrite;
private IncrementalInputSplits(
Configuration conf,
@@ -99,7 +101,8 @@ public class IncrementalInputSplits implements Serializable {
long maxCompactionMemoryInBytes,
@Nullable PartitionPruners.PartitionPruner partitionPruner,
boolean skipCompaction,
- boolean skipClustering) {
+ boolean skipClustering,
+ boolean skipInsertOverwrite) {
this.conf = conf;
this.path = path;
this.rowType = rowType;
@@ -107,6 +110,7 @@ public class IncrementalInputSplits implements Serializable
{
this.partitionPruner = partitionPruner;
this.skipCompaction = skipCompaction;
this.skipClustering = skipClustering;
+ this.skipInsertOverwrite = skipInsertOverwrite;
}
/**
@@ -135,6 +139,7 @@ public class IncrementalInputSplits implements Serializable
{
.rangeType(InstantRange.RangeType.CLOSED_CLOSED)
.skipCompaction(skipCompaction)
.skipClustering(skipClustering)
+ .skipInsertOverwrite(skipInsertOverwrite)
.build();
IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze();
@@ -241,6 +246,7 @@ public class IncrementalInputSplits implements Serializable
{
.rangeType(issuedOffset != null ? InstantRange.RangeType.OPEN_CLOSED :
InstantRange.RangeType.CLOSED_CLOSED)
.skipCompaction(skipCompaction)
.skipClustering(skipClustering)
+ .skipInsertOverwrite(skipInsertOverwrite)
.limit(OptionsResolver.getReadCommitsLimit(conf))
.build();
@@ -498,6 +504,8 @@ public class IncrementalInputSplits implements Serializable
{
private boolean skipCompaction = false;
// skip clustering
private boolean skipClustering = false;
+ // skip insert overwrite
+ private boolean skipInsertOverwrite = false;
public Builder() {
}
@@ -537,10 +545,15 @@ public class IncrementalInputSplits implements
Serializable {
return this;
}
+ public Builder skipInsertOverwrite(boolean skipInsertOverwrite) {
+ this.skipInsertOverwrite = skipInsertOverwrite;
+ return this;
+ }
+
public IncrementalInputSplits build() {
return new IncrementalInputSplits(
Objects.requireNonNull(this.conf),
Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
- this.maxCompactionMemoryInBytes, this.partitionPruner,
this.skipCompaction, this.skipClustering);
+ this.maxCompactionMemoryInBytes, this.partitionPruner,
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite);
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index fa911cadb0e..0e3b1f0ce58 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -124,6 +124,7 @@ public class StreamReadMonitoringFunction
.partitionPruner(partitionPruner)
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
+
.skipInsertOverwrite(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE))
.build();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 64211608e05..c15e4c628b6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -67,6 +67,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -142,18 +143,20 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
}
@Test
- void testFilterInstantsByCondition() throws IOException {
- Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ void testFilterInstantsByConditionForMOR() throws IOException {
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+ HoodieActiveTimeline timelineMOR = metaClient.getActiveTimeline();
- HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ // commit1: delta commit
HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
+ timelineMOR.createCompleteInstant(commit1);
+ // commit2: delta commit
HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
+ // commit3: clustering
+ timelineMOR.createCompleteInstant(commit2);
HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "3");
- timeline.createCompleteInstant(commit1);
- timeline.createCompleteInstant(commit2);
- timeline.createNewInstant(commit3);
- commit3 = timeline.transitionReplaceRequestedToInflight(commit3,
Option.empty());
+ timelineMOR.createNewInstant(commit3);
+ commit3 = timelineMOR.transitionReplaceRequestedToInflight(commit3,
Option.empty());
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
new ArrayList<>(),
new HashMap<>(),
@@ -161,15 +164,139 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
WriteOperationType.CLUSTER,
"",
HoodieTimeline.REPLACE_COMMIT_ACTION);
- timeline.transitionReplaceInflightToComplete(true,
+ timelineMOR.transitionReplaceInflightToComplete(true,
HoodieTimeline.getReplaceCommitInflightInstant(commit3.getTimestamp()),
serializeCommitMetadata(commitMetadata));
- timeline = timeline.reload();
+ // commit4: insert overwrite
+ HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "4");
+ timelineMOR.createNewInstant(commit4);
+ commit4 = timelineMOR.transitionReplaceRequestedToInflight(commit4,
Option.empty());
+ commitMetadata = CommitUtils.buildMetadata(
+ new ArrayList<>(),
+ new HashMap<>(),
+ Option.empty(),
+ WriteOperationType.INSERT_OVERWRITE,
+ "",
+ HoodieTimeline.REPLACE_COMMIT_ACTION);
+ timelineMOR.transitionReplaceInflightToComplete(true,
+
HoodieTimeline.getReplaceCommitInflightInstant(commit4.getTimestamp()),
+ serializeCommitMetadata(commitMetadata));
+ // commit5: insert overwrite table
+ HoodieInstant commit5 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "5");
+ timelineMOR.createNewInstant(commit5);
+ commit5 = timelineMOR.transitionReplaceRequestedToInflight(commit5,
Option.empty());
+ commitMetadata = CommitUtils.buildMetadata(
+ new ArrayList<>(),
+ new HashMap<>(),
+ Option.empty(),
+ WriteOperationType.INSERT_OVERWRITE_TABLE,
+ "",
+ HoodieTimeline.REPLACE_COMMIT_ACTION);
+ timelineMOR.transitionReplaceInflightToComplete(true,
+
HoodieTimeline.getReplaceCommitInflightInstant(commit5.getTimestamp()),
+ serializeCommitMetadata(commitMetadata));
+ // commit6: compaction
+ HoodieInstant commit6 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, "6");
+ timelineMOR.createNewInstant(commit6);
+ commit6 = timelineMOR.transitionCompactionRequestedToInflight(commit6);
+ commit6 = timelineMOR.transitionCompactionInflightToComplete(false,
commit6, Option.empty());
+ timelineMOR.createCompleteInstant(commit6);
+ timelineMOR = timelineMOR.reload();
+
+ // will not filter commits by default
+ HoodieTimeline resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineMOR, false, false, false);
+ assertEquals(6, resTimeline.getInstants().size());
+
+ // filter cluster commits
+ resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineMOR, false, true, false);
+ assertEquals(5, resTimeline.getInstants().size());
+ assertFalse(resTimeline.containsInstant(commit3));
+
+ // filter compaction commits for mor table
+ resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineMOR, true, false, false);
+ assertFalse(resTimeline.containsInstant(commit6));
+
+ // filter insert overwriter commits
+ resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineMOR, false, false, true);
+ assertEquals(4, resTimeline.getInstants().size());
+ assertFalse(resTimeline.containsInstant(commit4));
+ assertFalse(resTimeline.containsInstant(commit5));
+ }
+
+ @Test
+ void testFilterInstantsByConditionForCOW() throws IOException {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ HoodieActiveTimeline timelineCOW = metaClient.getActiveTimeline();
+
+ // commit1: commit
+ HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "1");
+ timelineCOW.createCompleteInstant(commit1);
+ // commit2: commit
+ HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "2");
+ // commit3: clustering
+ timelineCOW.createCompleteInstant(commit2);
+ HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "3");
+ timelineCOW.createNewInstant(commit3);
+ commit3 = timelineCOW.transitionReplaceRequestedToInflight(commit3,
Option.empty());
+ HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
+ new ArrayList<>(),
+ new HashMap<>(),
+ Option.empty(),
+ WriteOperationType.CLUSTER,
+ "",
+ HoodieTimeline.REPLACE_COMMIT_ACTION);
+ timelineCOW.transitionReplaceInflightToComplete(true,
+
HoodieTimeline.getReplaceCommitInflightInstant(commit3.getTimestamp()),
+ serializeCommitMetadata(commitMetadata));
+ // commit4: insert overwrite
+ HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "4");
+ timelineCOW.createNewInstant(commit4);
+ commit4 = timelineCOW.transitionReplaceRequestedToInflight(commit4,
Option.empty());
+ commitMetadata = CommitUtils.buildMetadata(
+ new ArrayList<>(),
+ new HashMap<>(),
+ Option.empty(),
+ WriteOperationType.INSERT_OVERWRITE,
+ "",
+ HoodieTimeline.REPLACE_COMMIT_ACTION);
+ timelineCOW.transitionReplaceInflightToComplete(true,
+
HoodieTimeline.getReplaceCommitInflightInstant(commit4.getTimestamp()),
+ serializeCommitMetadata(commitMetadata));
+ // commit5: insert overwrite table
+ HoodieInstant commit5 = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "5");
+ timelineCOW.createNewInstant(commit5);
+ commit5 = timelineCOW.transitionReplaceRequestedToInflight(commit5,
Option.empty());
+ commitMetadata = CommitUtils.buildMetadata(
+ new ArrayList<>(),
+ new HashMap<>(),
+ Option.empty(),
+ WriteOperationType.INSERT_OVERWRITE_TABLE,
+ "",
+ HoodieTimeline.REPLACE_COMMIT_ACTION);
+ timelineCOW.transitionReplaceInflightToComplete(true,
+
HoodieTimeline.getReplaceCommitInflightInstant(commit5.getTimestamp()),
+ serializeCommitMetadata(commitMetadata));
+
+ timelineCOW = timelineCOW.reload();
+
+ // will not filter commits by default
+ HoodieTimeline resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineCOW, false, false, false);
+ assertEquals(5, resTimeline.getInstants().size());
+
+ // filter cluster commits
+ resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineCOW, false, true, false);
+ assertEquals(4, resTimeline.getInstants().size());
+ assertFalse(resTimeline.containsInstant(commit3));
+
+ // cow table skip-compact does not take effect (because if it take effect
will affect normal commits)
+ resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineCOW, true, false, false);
+ assertEquals(5, resTimeline.getInstants().size());
- conf.set(FlinkOptions.READ_END_COMMIT, "3");
- HoodieTimeline resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timeline,
false, false);
- // will not filter cluster commit by default
+ // filter insert overwriter commits
+ resTimeline =
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient,
timelineCOW, false, false, true);
assertEquals(3, resTimeline.getInstants().size());
+ assertFalse(resTimeline.containsInstant(commit4));
+ assertFalse(resTimeline.containsInstant(commit5));
}
@Test