This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 1767b1bc [core] Introduce LookupMergeFunction and 
ForceUpLevel0Compaction
1767b1bc is described below

commit 1767b1bc4d913e81923109aaacf906f93b3b9eb1
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 14 11:18:05 2023 +0800

    [core] Introduce LookupMergeFunction and ForceUpLevel0Compaction
    
    This closes #594
---
 .../mergetree/compact/ForceUpLevel0Compaction.java |  57 ++++++
 .../LookupChangelogMergeFunctionWrapper.java       | 142 ++++++++++++++
 .../mergetree/compact/LookupMergeFunction.java     |  99 ++++++++++
 .../mergetree/compact/MergeTreeCompactManager.java |   7 +-
 .../mergetree/compact/UniversalCompaction.java     |   7 +-
 .../compact/aggregate/AggregateMergeFunction.java  |  83 +++------
 .../compact/ForceUpLevel0CompactionTest.java       |  65 +++++++
 .../LookupChangelogMergeFunctionWrapperTest.java   | 207 +++++++++++++++++++++
 .../compact/MergeTreeCompactManagerTest.java       |   4 +-
 .../mergetree/compact/UniversalCompactionTest.java |   2 +-
 10 files changed, 616 insertions(+), 57 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java
new file mode 100644
index 00000000..379825dc
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0Compaction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CompactStrategy} to force compacting level 0 files. */
+public class ForceUpLevel0Compaction implements CompactStrategy {
+
+    private final UniversalCompaction universal;
+
+    public ForceUpLevel0Compaction(UniversalCompaction universal) {
+        this.universal = universal;
+    }
+
+    @Override
+    public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> 
runs) {
+        Optional<CompactUnit> pick = universal.pick(numLevels, runs);
+        if (pick.isPresent()) {
+            return pick;
+        }
+
+        // collect all level 0 files
+        int candidateCount = 0;
+        for (int i = candidateCount; i < runs.size(); i++) {
+            if (runs.get(i).level() > 0) {
+                break;
+            }
+            candidateCount++;
+        }
+
+        return candidateCount == 0
+                ? Optional.empty()
+                : Optional.of(
+                        universal.pickForSizeRatio(numLevels - 1, runs, 
candidateCount, true));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
new file mode 100644
index 00000000..c4caeea1
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.types.RowKind;
+
+import java.util.function.Function;
+
+import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
+
+/**
+ * Wrapper for {@link MergeFunction}s to produce changelog by lookup during 
the compaction involving
+ * level 0 files.
+ *
+ * <p>Changelog records are generated in the process of the level-0 file 
participating in the
+ * compaction, if during the compaction processing:
+ *
+ * <ul>
+ *   <li>Without level-0 records, no changelog.
+ *   <li>With level-0 record, with level-x (x > 0) record, level-x record 
should be BEFORE, level-0
+ *       should be AFTER.
+ *   <li>With level-0 record, without level-x record, need to lookup the 
history value of the upper
+ *       level as BEFORE.
+ * </ul>
+ */
+public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<ChangelogResult> {
+
+    private final LookupMergeFunction mergeFunction;
+    private final MergeFunction<KeyValue> mergeFunction2;
+    private final Function<InternalRow, KeyValue> lookup;
+
+    private final ChangelogResult reusedResult = new ChangelogResult();
+    private final KeyValue reusedBefore = new KeyValue();
+    private final KeyValue reusedAfter = new KeyValue();
+
+    public LookupChangelogMergeFunctionWrapper(
+            MergeFunctionFactory<KeyValue> mergeFunctionFactory,
+            Function<InternalRow, KeyValue> lookup) {
+        MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
+        checkArgument(
+                mergeFunction instanceof LookupMergeFunction,
+                "Merge function should be a LookupMergeFunction, but is %s, 
there is a bug.",
+                mergeFunction.getClass().getName());
+        this.mergeFunction = (LookupMergeFunction) mergeFunction;
+        this.mergeFunction2 = mergeFunctionFactory.create();
+        this.lookup = lookup;
+    }
+
+    @Override
+    public void reset() {
+        mergeFunction.reset();
+    }
+
+    @Override
+    public void add(KeyValue kv) {
+        mergeFunction.add(kv);
+    }
+
+    @Override
+    public ChangelogResult getResult() {
+        reusedResult.reset();
+
+        KeyValue result = mergeFunction.getResult();
+        checkArgument(result != null);
+        KeyValue highLevel = mergeFunction.highLevel;
+        boolean containLevel0 = mergeFunction.containLevel0;
+
+        // 1. No level 0, just return
+        if (!containLevel0) {
+            return reusedResult.setResult(result);
+        }
+
+        // 2. With level 0, with the latest high level, return changelog
+        if (highLevel != null) {
+            setChangelog(highLevel, result);
+            return reusedResult.setResult(result);
+        }
+
+        // 3. Lookup to find the latest high level record
+        highLevel = lookup.apply(result.key());
+        if (highLevel != null) {
+            mergeFunction2.reset();
+            mergeFunction2.add(highLevel);
+            mergeFunction2.add(result);
+            result = mergeFunction2.getResult();
+            setChangelog(highLevel, result);
+        } else {
+            setChangelog(null, result);
+        }
+        return reusedResult.setResult(result);
+    }
+
+    private void setChangelog(KeyValue before, KeyValue after) {
+        if (before == null || !isAdd(before)) {
+            if (isAdd(after)) {
+                reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after));
+            }
+        } else {
+            if (isAdd(after)) {
+                reusedResult
+                        .addChangelog(replaceBefore(RowKind.UPDATE_BEFORE, 
before))
+                        .addChangelog(replaceAfter(RowKind.UPDATE_AFTER, 
after));
+            } else {
+                reusedResult.addChangelog(replaceBefore(RowKind.DELETE, 
before));
+            }
+        }
+    }
+
+    private KeyValue replaceBefore(RowKind valueKind, KeyValue from) {
+        return replace(reusedBefore, valueKind, from);
+    }
+
+    private KeyValue replaceAfter(RowKind valueKind, KeyValue from) {
+        return replace(reusedAfter, valueKind, from);
+    }
+
+    private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue 
from) {
+        return reused.replace(from.key(), from.sequenceNumber(), valueKind, 
from.value());
+    }
+
+    private boolean isAdd(KeyValue kv) {
+        return kv.valueKind() == RowKind.INSERT || kv.valueKind() == 
RowKind.UPDATE_AFTER;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java
new file mode 100644
index 00000000..c47b359b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeFunction.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.KeyValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * A {@link MergeFunction} for lookup, this wrapper only considers the latest 
high level record,
+ * because each merge will query the old merged record, so the latest high 
level record should be
+ * the final merged value.
+ */
+public class LookupMergeFunction implements MergeFunction<KeyValue> {
+
+    private final MergeFunction<KeyValue> mergeFunction;
+    private final LinkedList<KeyValue> candidates = new LinkedList<>();
+
+    KeyValue highLevel;
+    boolean containLevel0;
+
+    public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
+        this.mergeFunction = mergeFunction;
+    }
+
+    @Override
+    public void reset() {
+        candidates.clear();
+        highLevel = null;
+        containLevel0 = false;
+    }
+
+    @Override
+    public void add(KeyValue kv) {
+        candidates.add(kv);
+    }
+
+    @Override
+    public KeyValue getResult() {
+        // 1. Find the latest high level record
+        Iterator<KeyValue> descending = candidates.descendingIterator();
+        while (descending.hasNext()) {
+            KeyValue kv = descending.next();
+            if (kv.level() > 0) {
+                if (highLevel != null) {
+                    descending.remove();
+                } else {
+                    highLevel = kv;
+                }
+            } else {
+                containLevel0 = true;
+            }
+        }
+
+        // 2. Do the merge for inputs
+        mergeFunction.reset();
+        candidates.forEach(mergeFunction::add);
+        return mergeFunction.getResult();
+    }
+
+    public static MergeFunctionFactory<KeyValue> 
wrap(MergeFunctionFactory<KeyValue> wrapped) {
+        return new Factory(wrapped);
+    }
+
+    private static class Factory implements MergeFunctionFactory<KeyValue> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final MergeFunctionFactory<KeyValue> wrapped;
+
+        private Factory(MergeFunctionFactory<KeyValue> wrapped) {
+            this.wrapped = wrapped;
+        }
+
+        @Override
+        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+            return new LookupMergeFunction(wrapped.create(projection));
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index d90ea05b..99a09be0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -102,7 +102,12 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             }
             optionalUnit =
                     strategy.pick(levels.numberOfLevels(), runs)
-                            .map(unit -> unit.files().size() < 2 ? null : 
unit);
+                            .filter(unit -> unit.files().size() > 0)
+                            .filter(
+                                    unit ->
+                                            unit.files().size() > 1
+                                                    || 
unit.files().get(0).level()
+                                                            != 
unit.outputLevel());
         }
 
         optionalUnit.ifPresent(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index 06c4a0f8..933c7a4b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -121,6 +121,11 @@ public class UniversalCompaction implements 
CompactStrategy {
 
     private CompactUnit pickForSizeRatio(
             int maxLevel, List<LevelSortedRun> runs, int candidateCount) {
+        return pickForSizeRatio(maxLevel, runs, candidateCount, false);
+    }
+
+    public CompactUnit pickForSizeRatio(
+            int maxLevel, List<LevelSortedRun> runs, int candidateCount, 
boolean forcePick) {
         long candidateSize = candidateSize(runs, candidateCount);
         for (int i = candidateCount; i < runs.size(); i++) {
             LevelSortedRun next = runs.get(i);
@@ -132,7 +137,7 @@ public class UniversalCompaction implements CompactStrategy 
{
             candidateCount++;
         }
 
-        if (candidateCount > 1) {
+        if (forcePick || candidateCount > 1) {
             return createUnit(runs, maxLevel, candidateCount, maxSortedRunNum);
         }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
index a199a57a..762fb672 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.store.utils.Projection;
 
 import javax.annotation.Nullable;
 
-import java.io.Serializable;
 import java.util.List;
 
 import static org.apache.flink.table.store.options.ConfigOptions.key;
@@ -43,17 +42,21 @@ import static 
org.apache.flink.table.store.utils.RowDataUtils.createFieldGetters
  */
 public class AggregateMergeFunction implements MergeFunction<KeyValue> {
 
+    public static final String FIELDS = "fields";
+    public static final String AGG_FUNCTION = "aggregate-function";
+    public static final String IGNORE_RETRACT = "ignore-retract";
+
     private final InternalRow.FieldGetter[] getters;
-    private final RowAggregator rowAggregator;
+    private final FieldAggregator[] aggregators;
 
     private KeyValue latestKv;
     private GenericRow row;
     private KeyValue reused;
 
-    protected AggregateMergeFunction(
-            InternalRow.FieldGetter[] getters, RowAggregator rowAggregator) {
+    public AggregateMergeFunction(
+            InternalRow.FieldGetter[] getters, FieldAggregator[] aggregators) {
         this.getters = getters;
-        this.rowAggregator = rowAggregator;
+        this.aggregators = aggregators;
     }
 
     @Override
@@ -68,7 +71,7 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
         boolean isRetract =
                 kv.valueKind() != RowKind.INSERT && kv.valueKind() != 
RowKind.UPDATE_AFTER;
         for (int i = 0; i < getters.length; i++) {
-            FieldAggregator fieldAggregator = 
rowAggregator.getFieldAggregatorAtPos(i);
+            FieldAggregator fieldAggregator = aggregators[i];
             Object accumulator = getters[i].getFieldOrNull(row);
             Object inputField = getters[i].getFieldOrNull(kv.value());
             Object mergedField =
@@ -92,48 +95,6 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
         return reused.replace(latestKv.key(), latestKv.sequenceNumber(), 
RowKind.INSERT, row);
     }
 
-    /** Provide an Aggregator for merge a new row data. */
-    public static class RowAggregator implements Serializable {
-        public static final String FIELDS = "fields";
-        public static final String AGG_FUNCTION = "aggregate-function";
-        public static final String IGNORE_RETRACT = "ignore-retract";
-
-        private final FieldAggregator[] fieldAggregators;
-
-        public RowAggregator(
-                Options sqlConf,
-                List<String> fieldNames,
-                List<DataType> fieldTypes,
-                List<String> primaryKeys) {
-            fieldAggregators = new FieldAggregator[fieldNames.size()];
-            for (int i = 0; i < fieldNames.size(); i++) {
-                String fieldName = fieldNames.get(i);
-                DataType fieldType = fieldTypes.get(i);
-                // aggregate by primary keys, so they do not aggregate
-                boolean isPrimaryKey = primaryKeys.contains(fieldName);
-                String strAggFunc =
-                        sqlConf.get(
-                                key(FIELDS + "." + fieldName + "." + 
AGG_FUNCTION)
-                                        .stringType()
-                                        .noDefaultValue()
-                                        .withDescription(
-                                                "Get " + fieldName + "'s 
aggregate function"));
-                boolean ignoreRetract =
-                        sqlConf.get(
-                                key(FIELDS + "." + fieldName + "." + 
IGNORE_RETRACT)
-                                        .booleanType()
-                                        .defaultValue(false));
-                fieldAggregators[i] =
-                        FieldAggregator.createFieldAggregator(
-                                fieldType, strAggFunc, ignoreRetract, 
isPrimaryKey);
-            }
-        }
-
-        public FieldAggregator getFieldAggregatorAtPos(int fieldPos) {
-            return fieldAggregators[fieldPos];
-        }
-    }
-
     public static MergeFunctionFactory<KeyValue> factory(
             Options conf,
             List<String> tableNames,
@@ -172,10 +133,28 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
                 fieldTypes = project.project(tableTypes);
             }
 
-            return new AggregateMergeFunction(
-                    createFieldGetters(fieldTypes),
-                    new AggregateMergeFunction.RowAggregator(
-                            conf, fieldNames, fieldTypes, primaryKeys));
+            FieldAggregator[] fieldAggregators = new 
FieldAggregator[fieldNames.size()];
+            for (int i = 0; i < fieldNames.size(); i++) {
+                String fieldName = fieldNames.get(i);
+                DataType fieldType = fieldTypes.get(i);
+                // aggregate by primary keys, so they do not aggregate
+                boolean isPrimaryKey = primaryKeys.contains(fieldName);
+                String strAggFunc =
+                        conf.get(
+                                key(FIELDS + "." + fieldName + "." + 
AGG_FUNCTION)
+                                        .stringType()
+                                        .noDefaultValue());
+                boolean ignoreRetract =
+                        conf.get(
+                                key(FIELDS + "." + fieldName + "." + 
IGNORE_RETRACT)
+                                        .booleanType()
+                                        .defaultValue(false));
+                fieldAggregators[i] =
+                        FieldAggregator.createFieldAggregator(
+                                fieldType, strAggFunc, ignoreRetract, 
isPrimaryKey);
+            }
+
+            return new AggregateMergeFunction(createFieldGetters(fieldTypes), 
fieldAggregators);
         }
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0CompactionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0CompactionTest.java
new file mode 100644
index 00000000..c6922f2b
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ForceUpLevel0CompactionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.store.file.mergetree.compact.UniversalCompactionTest.file;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ForceUpLevel0Compaction}. */
+public class ForceUpLevel0CompactionTest {
+
+    @Test
+    public void testForceCompaction0() {
+        ForceUpLevel0Compaction compaction =
+                new ForceUpLevel0Compaction(new UniversalCompaction(200, 1, 5, 
Integer.MAX_VALUE));
+
+        Optional<CompactUnit> result = compaction.pick(3, Arrays.asList(run(0, 
1), run(0, 1)));
+        assertThat(result).isPresent();
+        assertThat(result.get().outputLevel()).isEqualTo(2);
+
+        result = compaction.pick(3, Arrays.asList(run(0, 1), run(1, 10)));
+        assertThat(result).isPresent();
+        assertThat(result.get().outputLevel()).isEqualTo(1);
+
+        result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 5), run(2, 
10)));
+        assertThat(result).isPresent();
+        assertThat(result.get().outputLevel()).isEqualTo(1);
+
+        result = compaction.pick(3, Collections.singletonList(run(2, 10)));
+        assertThat(result).isEmpty();
+
+        result = compaction.pick(3, Arrays.asList(run(0, 1), run(0, 5), run(0, 
10), run(0, 20)));
+        assertThat(result).isPresent();
+        assertThat(result.get().outputLevel()).isEqualTo(2);
+    }
+
+    private LevelSortedRun run(int level, int size) {
+        return new LevelSortedRun(level, SortedRun.fromSingle(file(size)));
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
new file mode 100644
index 00000000..c697948a
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.InternalRow.FieldGetter;
+import org.apache.flink.table.store.file.KeyValue;
+import 
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
+import 
org.apache.flink.table.store.file.mergetree.compact.aggregate.FieldAggregator;
+import 
org.apache.flink.table.store.file.mergetree.compact.aggregate.FieldSumAgg;
+import org.apache.flink.table.store.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
+import static org.apache.flink.table.store.types.RowKind.DELETE;
+import static org.apache.flink.table.store.types.RowKind.INSERT;
+import static org.apache.flink.table.store.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.table.store.types.RowKind.UPDATE_BEFORE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupChangelogMergeFunctionWrapper}. */
+public class LookupChangelogMergeFunctionWrapperTest {
+
+    @Test
+    public void testDeduplicate() {
+        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+                        highLevel::get);
+
+        // Without level-0
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(1));
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+        assertThat(result.changelogs()).isEmpty();
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 record, with level-x (x > 0) record
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        List<KeyValue> changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 record, without level-x record, query fail
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, UPDATE_AFTER, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(1);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(INSERT);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 record, without level-x record, query success
+        function.reset();
+        highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 record, without level-x record, query success but is 
'delete'
+        function.reset();
+        highLevel.put(row(1), new KeyValue().replace(row(1), 1, DELETE, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(1);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(INSERT);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 'delete' record, without level-x record, query fail
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, UPDATE_BEFORE, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        assertThat(result.changelogs()).isEmpty();
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 'delete' record, without level-x record, query success
+        function.reset();
+        highLevel.put(row(1), new KeyValue().replace(row(1), 1, DELETE, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, DELETE, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        assertThat(result.changelogs()).isEmpty();
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.valueKind()).isEqualTo(DELETE);
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+    }
+
+    @Test
+    public void testSum() {
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        LookupMergeFunction.wrap(
+                                projection ->
+                                        new AggregateMergeFunction(
+                                                new FieldGetter[] {
+                                                    row -> row.isNullAt(0) ? 
null : row.getInt(0)
+                                                },
+                                                new FieldAggregator[] {
+                                                    new 
FieldSumAgg(DataTypes.INT())
+                                                })),
+                        key -> null);
+
+        // Without level-0
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(1));
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+        assertThat(result.changelogs()).isEmpty();
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 record, with level-x (x > 0) record
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        List<KeyValue> changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(3);
+
+        // With level-0 record, with multiple level-x (x > 0) record
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(3));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 3, INSERT, 
row(2)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 4, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(4);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(4);
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
index 60600dfd..d5c2d9c8 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -91,8 +91,8 @@ public class MergeTreeCompactManagerTest {
     @Test
     public void testNoCompaction() throws ExecutionException, 
InterruptedException {
         innerTest(
-                Collections.singletonList(new LevelMinMax(0, 1, 3)),
-                Collections.singletonList(new LevelMinMax(0, 1, 3)));
+                Collections.singletonList(new LevelMinMax(3, 1, 3)),
+                Collections.singletonList(new LevelMinMax(3, 1, 3)));
     }
 
     @Test
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index 76b17f78..9fea7244 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -293,7 +293,7 @@ public class UniversalCompactionTest {
         return new LevelSortedRun(level, SortedRun.fromSingle(file(size)));
     }
 
-    private DataFileMeta file(long size) {
+    static DataFileMeta file(long size) {
         return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 
0);
     }
 }


Reply via email to