This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 1767b1bc [core] Introduce LookupMergeFunction and
ForceUpLevel0Compaction
1767b1bc is described below
commit 1767b1bc4d913e81923109aaacf906f93b3b9eb1
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 14 11:18:05 2023 +0800
[core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
This closes #594
---
.../mergetree/compact/ForceUpLevel0Compaction.java | 57 ++++++
.../LookupChangelogMergeFunctionWrapper.java | 142 ++++++++++++++
.../mergetree/compact/LookupMergeFunction.java | 99 ++++++++++
.../mergetree/compact/MergeTreeCompactManager.java | 7 +-
.../mergetree/compact/UniversalCompaction.java | 7 +-
.../compact/aggregate/AggregateMergeFunction.java | 83 +++------
.../compact/ForceUpLevel0CompactionTest.java | 65 +++++++
.../LookupChangelogMergeFunctionWrapperTest.java | 207 +++++++++++++++++++++
.../compact/MergeTreeCompactManagerTest.java | 4 +-
.../mergetree/compact/UniversalCompactionTest.java | 2 +-
10 files changed, 616 insertions(+), 57 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java
new file mode 100644
index 00000000..379825dc
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CompactStrategy} to force compacting level 0 files. */
+public class ForceUpLevel0Compaction implements CompactStrategy {
+
+ private final UniversalCompaction universal;
+
+ public ForceUpLevel0Compaction(UniversalCompaction universal) {
+ this.universal = universal;
+ }
+
+ @Override
+ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun>
runs) {
+ Optional<CompactUnit> pick = universal.pick(numLevels, runs);
+ if (pick.isPresent()) {
+ return pick;
+ }
+
+ // collect all level 0 files
+ int candidateCount = 0;
+ for (int i = candidateCount; i < runs.size(); i++) {
+ if (runs.get(i).level() > 0) {
+ break;
+ }
+ candidateCount++;
+ }
+
+ return candidateCount == 0
+ ? Optional.empty()
+ : Optional.of(
+ universal.pickForSizeRatio(numLevels - 1, runs,
candidateCount, true));
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
new file mode 100644
index 00000000..c4caeea1
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.types.RowKind;
+
+import java.util.function.Function;
+
+import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
+
+/**
+ * Wrapper for {@link MergeFunction}s to produce changelog by lookup during
the compaction involving
+ * level 0 files.
+ *
+ * <p>Changelog records are generated in the process of the level-0 file
participating in the
+ * compaction, if during the compaction processing:
+ *
+ * <ul>
+ * <li>Without level-0 records, no changelog.
+ * <li>With level-0 record, with level-x (x > 0) record, level-x record
should be BEFORE, level-0
+ * should be AFTER.
+ * <li>With level-0 record, without level-x record, need to lookup the
history value of the upper
+ * level as BEFORE.
+ * </ul>
+ */
+public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<ChangelogResult> {
+
+ private final LookupMergeFunction mergeFunction;
+ private final MergeFunction<KeyValue> mergeFunction2;
+ private final Function<InternalRow, KeyValue> lookup;
+
+ private final ChangelogResult reusedResult = new ChangelogResult();
+ private final KeyValue reusedBefore = new KeyValue();
+ private final KeyValue reusedAfter = new KeyValue();
+
+ public LookupChangelogMergeFunctionWrapper(
+ MergeFunctionFactory<KeyValue> mergeFunctionFactory,
+ Function<InternalRow, KeyValue> lookup) {
+ MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
+ checkArgument(
+ mergeFunction instanceof LookupMergeFunction,
+ "Merge function should be a LookupMergeFunction, but is %s,
there is a bug.",
+ mergeFunction.getClass().getName());
+ this.mergeFunction = (LookupMergeFunction) mergeFunction;
+ this.mergeFunction2 = mergeFunctionFactory.create();
+ this.lookup = lookup;
+ }
+
+ @Override
+ public void reset() {
+ mergeFunction.reset();
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ mergeFunction.add(kv);
+ }
+
+ @Override
+ public ChangelogResult getResult() {
+ reusedResult.reset();
+
+ KeyValue result = mergeFunction.getResult();
+ checkArgument(result != null);
+ KeyValue highLevel = mergeFunction.highLevel;
+ boolean containLevel0 = mergeFunction.containLevel0;
+
+ // 1. No level 0, just return
+ if (!containLevel0) {
+ return reusedResult.setResult(result);
+ }
+
+ // 2. With level 0, with the latest high level, return changelog
+ if (highLevel != null) {
+ setChangelog(highLevel, result);
+ return reusedResult.setResult(result);
+ }
+
+ // 3. Lookup to find the latest high level record
+ highLevel = lookup.apply(result.key());
+ if (highLevel != null) {
+ mergeFunction2.reset();
+ mergeFunction2.add(highLevel);
+ mergeFunction2.add(result);
+ result = mergeFunction2.getResult();
+ setChangelog(highLevel, result);
+ } else {
+ setChangelog(null, result);
+ }
+ return reusedResult.setResult(result);
+ }
+
+ private void setChangelog(KeyValue before, KeyValue after) {
+ if (before == null || !isAdd(before)) {
+ if (isAdd(after)) {
+ reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after));
+ }
+ } else {
+ if (isAdd(after)) {
+ reusedResult
+ .addChangelog(replaceBefore(RowKind.UPDATE_BEFORE,
before))
+ .addChangelog(replaceAfter(RowKind.UPDATE_AFTER,
after));
+ } else {
+ reusedResult.addChangelog(replaceBefore(RowKind.DELETE,
before));
+ }
+ }
+ }
+
+ private KeyValue replaceBefore(RowKind valueKind, KeyValue from) {
+ return replace(reusedBefore, valueKind, from);
+ }
+
+ private KeyValue replaceAfter(RowKind valueKind, KeyValue from) {
+ return replace(reusedAfter, valueKind, from);
+ }
+
+ private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue
from) {
+ return reused.replace(from.key(), from.sequenceNumber(), valueKind,
from.value());
+ }
+
+ private boolean isAdd(KeyValue kv) {
+ return kv.valueKind() == RowKind.INSERT || kv.valueKind() ==
RowKind.UPDATE_AFTER;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java
new file mode 100644
index 00000000..c47b359b
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.KeyValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * A {@link MergeFunction} for lookup, this wrapper only considers the latest
high level record,
+ * because each merge will query the old merged record, so the latest high
level record should be
+ * the final merged value.
+ */
+public class LookupMergeFunction implements MergeFunction<KeyValue> {
+
+ private final MergeFunction<KeyValue> mergeFunction;
+ private final LinkedList<KeyValue> candidates = new LinkedList<>();
+
+ KeyValue highLevel;
+ boolean containLevel0;
+
+ public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
+ this.mergeFunction = mergeFunction;
+ }
+
+ @Override
+ public void reset() {
+ candidates.clear();
+ highLevel = null;
+ containLevel0 = false;
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ candidates.add(kv);
+ }
+
+ @Override
+ public KeyValue getResult() {
+ // 1. Find the latest high level record
+ Iterator<KeyValue> descending = candidates.descendingIterator();
+ while (descending.hasNext()) {
+ KeyValue kv = descending.next();
+ if (kv.level() > 0) {
+ if (highLevel != null) {
+ descending.remove();
+ } else {
+ highLevel = kv;
+ }
+ } else {
+ containLevel0 = true;
+ }
+ }
+
+ // 2. Do the merge for inputs
+ mergeFunction.reset();
+ candidates.forEach(mergeFunction::add);
+ return mergeFunction.getResult();
+ }
+
+ public static MergeFunctionFactory<KeyValue>
wrap(MergeFunctionFactory<KeyValue> wrapped) {
+ return new Factory(wrapped);
+ }
+
+ private static class Factory implements MergeFunctionFactory<KeyValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final MergeFunctionFactory<KeyValue> wrapped;
+
+ private Factory(MergeFunctionFactory<KeyValue> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+ return new LookupMergeFunction(wrapped.create(projection));
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index d90ea05b..99a09be0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -102,7 +102,12 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
}
optionalUnit =
strategy.pick(levels.numberOfLevels(), runs)
- .map(unit -> unit.files().size() < 2 ? null :
unit);
+ .filter(unit -> unit.files().size() > 0)
+ .filter(
+ unit ->
+ unit.files().size() > 1
+ ||
unit.files().get(0).level()
+ !=
unit.outputLevel());
}
optionalUnit.ifPresent(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index 06c4a0f8..933c7a4b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -121,6 +121,11 @@ public class UniversalCompaction implements
CompactStrategy {
private CompactUnit pickForSizeRatio(
int maxLevel, List<LevelSortedRun> runs, int candidateCount) {
+ return pickForSizeRatio(maxLevel, runs, candidateCount, false);
+ }
+
+ public CompactUnit pickForSizeRatio(
+ int maxLevel, List<LevelSortedRun> runs, int candidateCount,
boolean forcePick) {
long candidateSize = candidateSize(runs, candidateCount);
for (int i = candidateCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
@@ -132,7 +137,7 @@ public class UniversalCompaction implements CompactStrategy
{
candidateCount++;
}
- if (candidateCount > 1) {
+ if (forcePick || candidateCount > 1) {
return createUnit(runs, maxLevel, candidateCount, maxSortedRunNum);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
index a199a57a..762fb672 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.store.utils.Projection;
import javax.annotation.Nullable;
-import java.io.Serializable;
import java.util.List;
import static org.apache.flink.table.store.options.ConfigOptions.key;
@@ -43,17 +42,21 @@ import static
org.apache.flink.table.store.utils.RowDataUtils.createFieldGetters
*/
public class AggregateMergeFunction implements MergeFunction<KeyValue> {
+ public static final String FIELDS = "fields";
+ public static final String AGG_FUNCTION = "aggregate-function";
+ public static final String IGNORE_RETRACT = "ignore-retract";
+
private final InternalRow.FieldGetter[] getters;
- private final RowAggregator rowAggregator;
+ private final FieldAggregator[] aggregators;
private KeyValue latestKv;
private GenericRow row;
private KeyValue reused;
- protected AggregateMergeFunction(
- InternalRow.FieldGetter[] getters, RowAggregator rowAggregator) {
+ public AggregateMergeFunction(
+ InternalRow.FieldGetter[] getters, FieldAggregator[] aggregators) {
this.getters = getters;
- this.rowAggregator = rowAggregator;
+ this.aggregators = aggregators;
}
@Override
@@ -68,7 +71,7 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
boolean isRetract =
kv.valueKind() != RowKind.INSERT && kv.valueKind() !=
RowKind.UPDATE_AFTER;
for (int i = 0; i < getters.length; i++) {
- FieldAggregator fieldAggregator =
rowAggregator.getFieldAggregatorAtPos(i);
+ FieldAggregator fieldAggregator = aggregators[i];
Object accumulator = getters[i].getFieldOrNull(row);
Object inputField = getters[i].getFieldOrNull(kv.value());
Object mergedField =
@@ -92,48 +95,6 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
return reused.replace(latestKv.key(), latestKv.sequenceNumber(),
RowKind.INSERT, row);
}
- /** Provide an Aggregator for merge a new row data. */
- public static class RowAggregator implements Serializable {
- public static final String FIELDS = "fields";
- public static final String AGG_FUNCTION = "aggregate-function";
- public static final String IGNORE_RETRACT = "ignore-retract";
-
- private final FieldAggregator[] fieldAggregators;
-
- public RowAggregator(
- Options sqlConf,
- List<String> fieldNames,
- List<DataType> fieldTypes,
- List<String> primaryKeys) {
- fieldAggregators = new FieldAggregator[fieldNames.size()];
- for (int i = 0; i < fieldNames.size(); i++) {
- String fieldName = fieldNames.get(i);
- DataType fieldType = fieldTypes.get(i);
- // aggregate by primary keys, so they do not aggregate
- boolean isPrimaryKey = primaryKeys.contains(fieldName);
- String strAggFunc =
- sqlConf.get(
- key(FIELDS + "." + fieldName + "." +
AGG_FUNCTION)
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Get " + fieldName + "'s
aggregate function"));
- boolean ignoreRetract =
- sqlConf.get(
- key(FIELDS + "." + fieldName + "." +
IGNORE_RETRACT)
- .booleanType()
- .defaultValue(false));
- fieldAggregators[i] =
- FieldAggregator.createFieldAggregator(
- fieldType, strAggFunc, ignoreRetract,
isPrimaryKey);
- }
- }
-
- public FieldAggregator getFieldAggregatorAtPos(int fieldPos) {
- return fieldAggregators[fieldPos];
- }
- }
-
public static MergeFunctionFactory<KeyValue> factory(
Options conf,
List<String> tableNames,
@@ -172,10 +133,28 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
fieldTypes = project.project(tableTypes);
}
- return new AggregateMergeFunction(
- createFieldGetters(fieldTypes),
- new AggregateMergeFunction.RowAggregator(
- conf, fieldNames, fieldTypes, primaryKeys));
+ FieldAggregator[] fieldAggregators = new
FieldAggregator[fieldNames.size()];
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String fieldName = fieldNames.get(i);
+ DataType fieldType = fieldTypes.get(i);
+ // aggregate by primary keys, so they do not aggregate
+ boolean isPrimaryKey = primaryKeys.contains(fieldName);
+ String strAggFunc =
+ conf.get(
+ key(FIELDS + "." + fieldName + "." +
AGG_FUNCTION)
+ .stringType()
+ .noDefaultValue());
+ boolean ignoreRetract =
+ conf.get(
+ key(FIELDS + "." + fieldName + "." +
IGNORE_RETRACT)
+ .booleanType()
+ .defaultValue(false));
+ fieldAggregators[i] =
+ FieldAggregator.createFieldAggregator(
+ fieldType, strAggFunc, ignoreRetract,
isPrimaryKey);
+ }
+
+ return new AggregateMergeFunction(createFieldGetters(fieldTypes),
fieldAggregators);
}
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0CompactionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0CompactionTest.java
new file mode 100644
index 00000000..c6922f2b
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0CompactionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static
org.apache.flink.table.store.file.mergetree.compact.UniversalCompactionTest.file;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ForceUpLevel0Compaction}. */
+public class ForceUpLevel0CompactionTest {
+
+ @Test
+ public void testForceCompaction0() {
+ ForceUpLevel0Compaction compaction =
+ new ForceUpLevel0Compaction(new UniversalCompaction(200, 1, 5,
Integer.MAX_VALUE));
+
+ Optional<CompactUnit> result = compaction.pick(3, Arrays.asList(run(0,
1), run(0, 1)));
+ assertThat(result).isPresent();
+ assertThat(result.get().outputLevel()).isEqualTo(2);
+
+ result = compaction.pick(3, Arrays.asList(run(0, 1), run(1, 10)));
+ assertThat(result).isPresent();
+ assertThat(result.get().outputLevel()).isEqualTo(1);
+
+ result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 5), run(2,
10)));
+ assertThat(result).isPresent();
+ assertThat(result.get().outputLevel()).isEqualTo(1);
+
+ result = compaction.pick(3, Collections.singletonList(run(2, 10)));
+ assertThat(result).isEmpty();
+
+ result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 5), run(0,
10), run(0, 20)));
+ assertThat(result).isPresent();
+ assertThat(result.get().outputLevel()).isEqualTo(2);
+ }
+
+ private LevelSortedRun run(int level, int size) {
+ return new LevelSortedRun(level, SortedRun.fromSingle(file(size)));
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
new file mode 100644
index 00000000..c697948a
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.InternalRow.FieldGetter;
+import org.apache.flink.table.store.file.KeyValue;
+import
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
+import
org.apache.flink.table.store.file.mergetree.compact.aggregate.FieldAggregator;
+import
org.apache.flink.table.store.file.mergetree.compact.aggregate.FieldSumAgg;
+import org.apache.flink.table.store.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
+import static org.apache.flink.table.store.types.RowKind.DELETE;
+import static org.apache.flink.table.store.types.RowKind.INSERT;
+import static org.apache.flink.table.store.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.table.store.types.RowKind.UPDATE_BEFORE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupChangelogMergeFunctionWrapper}. */
+public class LookupChangelogMergeFunctionWrapperTest {
+
+ @Test
+ public void testDeduplicate() {
+ Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+ highLevel::get);
+
+ // Without level-0
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(1));
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+ assertThat(result.changelogs()).isEmpty();
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 record, with level-x (x > 0) record
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ List<KeyValue> changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 record, without level-x record, query fail
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, UPDATE_AFTER,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(1);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(INSERT);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 record, without level-x record, query success
+ function.reset();
+ highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 record, without level-x record, query success but is
'delete'
+ function.reset();
+ highLevel.put(row(1), new KeyValue().replace(row(1), 1, DELETE,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(1);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(INSERT);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 'delete' record, without level-x record, query fail
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, UPDATE_BEFORE,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ assertThat(result.changelogs()).isEmpty();
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 'delete' record, without level-x record, query success
+ function.reset();
+ highLevel.put(row(1), new KeyValue().replace(row(1), 1, DELETE,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, DELETE,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ assertThat(result.changelogs()).isEmpty();
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.valueKind()).isEqualTo(DELETE);
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+ }
+
+ @Test
+ public void testSum() {
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+ LookupMergeFunction.wrap(
+ projection ->
+ new AggregateMergeFunction(
+ new FieldGetter[] {
+ row -> row.isNullAt(0) ?
null : row.getInt(0)
+ },
+ new FieldAggregator[] {
+ new
FieldSumAgg(DataTypes.INT())
+ })),
+ key -> null);
+
+ // Without level-0
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(1));
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+ assertThat(result.changelogs()).isEmpty();
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 record, with level-x (x > 0) record
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ List<KeyValue> changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(3);
+
+ // With level-0 record, with multiple level-x (x > 0) record
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(3));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 3, INSERT,
row(2)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 4, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(4);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(4);
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
index 60600dfd..d5c2d9c8 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -91,8 +91,8 @@ public class MergeTreeCompactManagerTest {
@Test
public void testNoCompaction() throws ExecutionException,
InterruptedException {
innerTest(
- Collections.singletonList(new LevelMinMax(0, 1, 3)),
- Collections.singletonList(new LevelMinMax(0, 1, 3)));
+ Collections.singletonList(new LevelMinMax(3, 1, 3)),
+ Collections.singletonList(new LevelMinMax(3, 1, 3)));
}
@Test
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index 76b17f78..9fea7244 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -293,7 +293,7 @@ public class UniversalCompactionTest {
return new LevelSortedRun(level, SortedRun.fromSingle(file(size)));
}
- private DataFileMeta file(long size) {
+ static DataFileMeta file(long size) {
return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0,
0);
}
}