This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e24fa4 [FLINK-31392] Refactor classes code of full-compaction
c3e24fa4 is described below

commit c3e24fa4b0fd052a227296b03f9dfb0521ad833e
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Mar 11 16:57:32 2023 +0800

    [FLINK-31392] Refactor classes code of full-compaction
    
    This closes #589
---
 .../org/apache/flink/table/store/utils/Filter.java |  37 +++++++
 .../flink/table/store/utils/MurmurHashUtils.java   |  11 +++
 .../org/apache/flink/table/store/CoreOptions.java  |   8 --
 .../store/file/mergetree/MergeTreeWriter.java      |   2 +-
 ...writer.java => ChangelogMergeTreeRewriter.java} |  56 +++++------
 .../file/mergetree/compact/ChangelogResult.java    |  68 +++++++++++++
 .../compact/FullChangelogMergeFunctionWrapper.java | 106 ++++-----------------
 .../FullChangelogMergeTreeCompactRewriter.java     |  88 +++--------------
 .../compact/ReducerMergeFunctionWrapper.java       |   4 +-
 .../file/operation/AbstractFileStoreScan.java      |   9 +-
 .../table/store/file/operation/FileStoreScan.java  |   3 +-
 .../flink/table/store/table/sink/TableWrite.java   |   2 -
 .../store/table/source/AbstractDataTableScan.java  |   5 +-
 .../table/store/table/source/DataTableScan.java    |   3 +-
 .../ContinuousDataFileSnapshotEnumerator.java      |  26 ++---
 .../table/store/table/system/AuditLogTable.java    |   5 +-
 .../FullChangelogMergeFunctionWrapperTestBase.java |  25 ++++-
 .../mergetree/compact/MergeFunctionTestUtils.java  |   7 ++
 .../CompactionChangelogFollowUpScannerTest.java    |   3 +-
 .../sink/FullChangelogStoreSinkWrite.java          |   2 +-
 20 files changed, 236 insertions(+), 234 deletions(-)

diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/Filter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/Filter.java
new file mode 100644
index 00000000..73826b65
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/Filter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import org.apache.flink.table.store.file.predicate.Predicate;
+
+/**
+ * Represents a filter (boolean-valued function) of one argument. This class 
is for avoiding name
+ * conflicting to {@link Predicate}.
+ */
+@FunctionalInterface
+public interface Filter<T> {
+
+    /**
+     * Evaluates this predicate on the given argument.
+     *
+     * @param t the input argument
+     * @return {@code true} if the input argument matches the predicate, 
otherwise {@code false}
+     */
+    boolean test(T t);
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
index 03e3603f..53e672d3 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.utils;
 
 import org.apache.flink.table.store.memory.MemorySegment;
 
+import static 
org.apache.flink.table.store.memory.MemorySegment.BYTE_ARRAY_BASE_OFFSET;
 import static org.apache.flink.table.store.memory.MemorySegment.UNSAFE;
 
 /** Murmur Hash. This is inspired by Guava's Murmur3_32HashFunction. */
@@ -45,6 +46,16 @@ public final class MurmurHashUtils {
         return hashUnsafeBytesByWords(base, offset, lengthInBytes, 
DEFAULT_SEED);
     }
 
+    /** Hash bytes. */
+    public static int hashBytesPositive(byte[] bytes) {
+        return hashBytes(bytes) & 0x7fffffff;
+    }
+
+    /** Hash bytes. */
+    public static int hashBytes(byte[] bytes) {
+        return hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET, bytes.length, 
DEFAULT_SEED);
+    }
+
     /**
      * Hash unsafe bytes.
      *
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 8cb87388..89395df8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -546,14 +546,6 @@ public class CoreOptions implements Serializable {
         return options.get(PARTITION_DEFAULT_NAME);
     }
 
-    public String orcBloomFilterColumns() {
-        return options.get(ORC_BLOOM_FILTER_COLUMNS);
-    }
-
-    public double orcBloomFilterFpp() {
-        return options.get(ORC_BLOOM_FILTER_FPP);
-    }
-
     public Map<Integer, String> fileCompressionPerLevel() {
         Map<String, String> levelCompressions = 
options.get(FILE_COMPRESSION_PER_LEVEL);
         return levelCompressions.entrySet().stream()
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 93f735d0..b159b23e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -206,7 +206,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
 
     @Override
     public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
-        flushWriteBuffer(false, false);
+        flushWriteBuffer(waitCompaction, false);
         trySyncLatestCompaction(waitCompaction || commitForceCompact);
         return drainIncrement();
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogMergeTreeRewriter.java
similarity index 72%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogMergeTreeRewriter.java
index d4017095..f4d4f2ed 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -29,44 +29,43 @@ import 
org.apache.flink.table.store.file.mergetree.MergeTreeReaders;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 import org.apache.flink.table.store.reader.RecordReader;
 import org.apache.flink.table.store.reader.RecordReaderIterator;
-import org.apache.flink.table.store.utils.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
-/** A {@link MergeTreeCompactRewriter} which produces changelog files for each 
full compaction. */
-public class FullChangelogMergeTreeCompactRewriter extends 
MergeTreeCompactRewriter {
+/** A {@link MergeTreeCompactRewriter} which produces changelog files for the 
compaction. */
+public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewriter {
 
-    private final int maxLevel;
-
-    public FullChangelogMergeTreeCompactRewriter(
-            int maxLevel,
+    public ChangelogMergeTreeRewriter(
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory) {
         super(readerFactory, writerFactory, keyComparator, mfFactory);
-        this.maxLevel = maxLevel;
     }
 
+    protected abstract boolean rewriteChangelog(
+            int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections);
+
+    protected abstract boolean upgradeChangelog(int outputLevel, DataFileMeta 
file);
+
+    protected abstract MergeFunctionWrapper<ChangelogResult> 
createMergeWrapper(int outputLevel);
+
     @Override
     public CompactResult rewrite(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) throws Exception {
-        if (outputLevel == maxLevel) {
-            Preconditions.checkArgument(
-                    dropDelete,
-                    "Delete records should be dropped from result of full 
compaction. This is unexpected.");
-            return rewriteFullCompaction(sections);
+        if (rewriteChangelog(outputLevel, dropDelete, sections)) {
+            return rewriteChangelogCompaction(outputLevel, sections);
         } else {
             return rewriteCompaction(outputLevel, dropDelete, sections);
         }
     }
 
-    private CompactResult rewriteFullCompaction(List<List<SortedRun>> 
sections) throws Exception {
-        
List<ConcatRecordReader.ReaderSupplier<FullChangelogMergeFunctionWrapper.Result>>
-                sectionReaders = new ArrayList<>();
+    private CompactResult rewriteChangelogCompaction(
+            int outputLevel, List<List<SortedRun>> sections) throws Exception {
+        List<ConcatRecordReader.ReaderSupplier<ChangelogResult>> 
sectionReaders = new ArrayList<>();
         for (List<SortedRun> section : sections) {
             sectionReaders.add(
                     () -> {
@@ -75,32 +74,26 @@ public class FullChangelogMergeTreeCompactRewriter extends 
MergeTreeCompactRewri
                             runReaders.add(MergeTreeReaders.readerForRun(run, 
readerFactory));
                         }
                         return new SortMergeReader<>(
-                                runReaders,
-                                keyComparator,
-                                new FullChangelogMergeFunctionWrapper(
-                                        mfFactory.create(), maxLevel));
+                                runReaders, keyComparator, 
createMergeWrapper(outputLevel));
                     });
         }
 
-        RecordReaderIterator<FullChangelogMergeFunctionWrapper.Result> 
iterator = null;
+        RecordReaderIterator<ChangelogResult> iterator = null;
         RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
         RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
 
         try {
             iterator = new 
RecordReaderIterator<>(ConcatRecordReader.create(sectionReaders));
-            compactFileWriter = 
writerFactory.createRollingMergeTreeFileWriter(maxLevel);
-            changelogFileWriter = 
writerFactory.createRollingChangelogFileWriter(maxLevel);
+            compactFileWriter = 
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+            changelogFileWriter = 
writerFactory.createRollingChangelogFileWriter(outputLevel);
 
             while (iterator.hasNext()) {
-                FullChangelogMergeFunctionWrapper.Result result = 
iterator.next();
+                ChangelogResult result = iterator.next();
                 if (result.result() != null) {
                     compactFileWriter.write(result.result());
                 }
-                if (result.before() != null) {
-                    changelogFileWriter.write(result.before());
-                }
-                if (result.after() != null) {
-                    changelogFileWriter.write(result.after());
+                for (KeyValue kv : result.changelogs()) {
+                    changelogFileWriter.write(kv);
                 }
             }
         } finally {
@@ -123,8 +116,9 @@ public class FullChangelogMergeTreeCompactRewriter extends 
MergeTreeCompactRewri
 
     @Override
     public CompactResult upgrade(int outputLevel, DataFileMeta file) throws 
Exception {
-        if (outputLevel == maxLevel) {
-            return rewriteFullCompaction(
+        if (upgradeChangelog(outputLevel, file)) {
+            return rewriteChangelogCompaction(
+                    outputLevel,
                     Collections.singletonList(
                             
Collections.singletonList(SortedRun.fromSingle(file))));
         } else {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogResult.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogResult.java
new file mode 100644
index 00000000..a4414e7e
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Changelog and final result for the same key. */
+public class ChangelogResult {
+
+    private final List<KeyValue> changelogs = new ArrayList<>();
+    @Nullable private KeyValue result;
+
+    public void reset() {
+        changelogs.clear();
+        result = null;
+    }
+
+    public ChangelogResult addChangelog(KeyValue record) {
+        changelogs.add(record);
+        return this;
+    }
+
+    public ChangelogResult setResultIfNotRetract(@Nullable KeyValue result) {
+        if (result != null
+                && result.valueKind() != RowKind.DELETE
+                && result.valueKind() != RowKind.UPDATE_BEFORE) {
+            setResult(result);
+        }
+        return this;
+    }
+
+    public ChangelogResult setResult(@Nullable KeyValue result) {
+        this.result = result;
+        return this;
+    }
+
+    public List<KeyValue> changelogs() {
+        return changelogs;
+    }
+
+    /** Latest result (result of merge function) for this key. */
+    @Nullable
+    public KeyValue result() {
+        return result;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
index c01d0857..699b2724 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
@@ -22,8 +22,6 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.types.RowKind;
 import org.apache.flink.table.store.utils.Preconditions;
 
-import javax.annotation.Nullable;
-
 /**
  * Wrapper for {@link MergeFunction}s to produce changelog during a full 
compaction.
  *
@@ -36,20 +34,19 @@ import javax.annotation.Nullable;
  *       SortMergeReader}, so there is no issue related to object reuse.
  * </ul>
  */
-public class FullChangelogMergeFunctionWrapper
-        implements 
MergeFunctionWrapper<FullChangelogMergeFunctionWrapper.Result> {
+public class FullChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<ChangelogResult> {
 
     private final MergeFunction<KeyValue> mergeFunction;
     private final int maxLevel;
 
     // only full compaction will write files into maxLevel, see 
UniversalCompaction class
-    private transient KeyValue topLevelKv;
-    private transient KeyValue initialKv;
-    private transient boolean isInitialized;
+    private KeyValue topLevelKv;
+    private KeyValue initialKv;
+    private boolean isInitialized;
 
-    private transient Result reusedResult;
-    private transient KeyValue reusedBefore;
-    private transient KeyValue reusedAfter;
+    private final ChangelogResult reusedResult = new ChangelogResult();
+    private final KeyValue reusedBefore = new KeyValue();
+    private final KeyValue reusedAfter = new KeyValue();
 
     public FullChangelogMergeFunctionWrapper(MergeFunction<KeyValue> 
mergeFunction, int maxLevel) {
         Preconditions.checkArgument(
@@ -93,43 +90,33 @@ public class FullChangelogMergeFunctionWrapper
     }
 
     @Override
-    public Result getResult() {
-        if (reusedResult == null) {
-            reusedResult = new Result();
-            reusedBefore = new KeyValue();
-            reusedAfter = new KeyValue();
-        }
-
+    public ChangelogResult getResult() {
+        reusedResult.reset();
         if (isInitialized) {
             KeyValue merged = mergeFunction.getResult();
             if (topLevelKv == null) {
                 if (merged != null && isAdd(merged)) {
-                    reusedResult.setChangelog(null, replace(reusedAfter, 
RowKind.INSERT, merged));
-                } else {
-                    reusedResult.setChangelog(null, null);
+                    reusedResult.addChangelog(replace(reusedAfter, 
RowKind.INSERT, merged));
                 }
             } else {
                 if (merged != null && isAdd(merged)) {
-                    reusedResult.setChangelog(
-                            replace(reusedBefore, RowKind.UPDATE_BEFORE, 
topLevelKv),
-                            replace(reusedAfter, RowKind.UPDATE_AFTER, 
merged));
+                    reusedResult
+                            .addChangelog(replace(reusedBefore, 
RowKind.UPDATE_BEFORE, topLevelKv))
+                            .addChangelog(replace(reusedAfter, 
RowKind.UPDATE_AFTER, merged));
                 } else {
-                    reusedResult.setChangelog(
-                            replace(reusedBefore, RowKind.DELETE, topLevelKv), 
null);
+                    reusedResult.addChangelog(replace(reusedBefore, 
RowKind.DELETE, topLevelKv));
                 }
             }
-            return reusedResult.setResult(merged);
+            return reusedResult.setResultIfNotRetract(merged);
         } else {
             if (topLevelKv == null && isAdd(initialKv)) {
-                reusedResult.setChangelog(null, replace(reusedAfter, 
RowKind.INSERT, initialKv));
-            } else {
-                // either topLevelKv is not null, but there is only one kv,
-                // so topLevelKv must be the only kv, which means there is no 
change
-                //
-                // or initialKv is not an ADD kv, so no new key is added
-                reusedResult.setChangelog(null, null);
+                reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, 
initialKv));
             }
-            return reusedResult.setResult(initialKv);
+            // either topLevelKv is not null, but there is only one kv,
+            // so topLevelKv must be the only kv, which means there is no 
change
+            //
+            // or initialKv is not an ADD kv, so no new key is added
+            return reusedResult.setResultIfNotRetract(initialKv);
         }
     }
 
@@ -140,55 +127,4 @@ public class FullChangelogMergeFunctionWrapper
     private boolean isAdd(KeyValue kv) {
         return kv.valueKind() == RowKind.INSERT || kv.valueKind() == 
RowKind.UPDATE_AFTER;
     }
-
-    /** Changelog and final result for the same key. */
-    public static class Result {
-
-        @Nullable private KeyValue before;
-        @Nullable private KeyValue after;
-        @Nullable private KeyValue result;
-
-        private Result() {}
-
-        private void setChangelog(@Nullable KeyValue before, @Nullable 
KeyValue after) {
-            this.before = before;
-            this.after = after;
-        }
-
-        private Result setResult(@Nullable KeyValue result) {
-            if (result != null && result.valueKind() != RowKind.DELETE) {
-                this.result = result;
-            } else {
-                this.result = null;
-            }
-            return this;
-        }
-
-        /**
-         * Previous full compaction result for this key. Null if this key does 
not appear in the
-         * previous result or its value remains unchanged.
-         */
-        @Nullable
-        public KeyValue before() {
-            return before;
-        }
-
-        /**
-         * Latest full compaction result for this key. Null if this key does 
not appear in the
-         * latest result or its value remains unchanged.
-         */
-        @Nullable
-        public KeyValue after() {
-            return after;
-        }
-
-        /**
-         * Latest full compaction result (result of merge function) for this 
key. Null if the merged
-         * result is null or is of DELETE kind.
-         */
-        @Nullable
-        public KeyValue result() {
-            return result;
-        }
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index d4017095..6fc064c4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -20,24 +20,17 @@ package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
 import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
-import org.apache.flink.table.store.file.io.RollingFileWriter;
-import org.apache.flink.table.store.file.mergetree.MergeTreeReaders;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
-import org.apache.flink.table.store.reader.RecordReader;
-import org.apache.flink.table.store.reader.RecordReaderIterator;
 import org.apache.flink.table.store.utils.Preconditions;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
 /** A {@link MergeTreeCompactRewriter} which produces changelog files for each 
full compaction. */
-public class FullChangelogMergeTreeCompactRewriter extends 
MergeTreeCompactRewriter {
+public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
 
     private final int maxLevel;
 
@@ -52,83 +45,24 @@ public class FullChangelogMergeTreeCompactRewriter extends 
MergeTreeCompactRewri
     }
 
     @Override
-    public CompactResult rewrite(
-            int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) throws Exception {
-        if (outputLevel == maxLevel) {
+    protected boolean rewriteChangelog(
+            int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
+        boolean changelog = outputLevel == maxLevel;
+        if (changelog) {
             Preconditions.checkArgument(
                     dropDelete,
                     "Delete records should be dropped from result of full 
compaction. This is unexpected.");
-            return rewriteFullCompaction(sections);
-        } else {
-            return rewriteCompaction(outputLevel, dropDelete, sections);
         }
+        return changelog;
     }
 
-    private CompactResult rewriteFullCompaction(List<List<SortedRun>> 
sections) throws Exception {
-        
List<ConcatRecordReader.ReaderSupplier<FullChangelogMergeFunctionWrapper.Result>>
-                sectionReaders = new ArrayList<>();
-        for (List<SortedRun> section : sections) {
-            sectionReaders.add(
-                    () -> {
-                        List<RecordReader<KeyValue>> runReaders = new 
ArrayList<>();
-                        for (SortedRun run : section) {
-                            runReaders.add(MergeTreeReaders.readerForRun(run, 
readerFactory));
-                        }
-                        return new SortMergeReader<>(
-                                runReaders,
-                                keyComparator,
-                                new FullChangelogMergeFunctionWrapper(
-                                        mfFactory.create(), maxLevel));
-                    });
-        }
-
-        RecordReaderIterator<FullChangelogMergeFunctionWrapper.Result> 
iterator = null;
-        RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
-        RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
-
-        try {
-            iterator = new 
RecordReaderIterator<>(ConcatRecordReader.create(sectionReaders));
-            compactFileWriter = 
writerFactory.createRollingMergeTreeFileWriter(maxLevel);
-            changelogFileWriter = 
writerFactory.createRollingChangelogFileWriter(maxLevel);
-
-            while (iterator.hasNext()) {
-                FullChangelogMergeFunctionWrapper.Result result = 
iterator.next();
-                if (result.result() != null) {
-                    compactFileWriter.write(result.result());
-                }
-                if (result.before() != null) {
-                    changelogFileWriter.write(result.before());
-                }
-                if (result.after() != null) {
-                    changelogFileWriter.write(result.after());
-                }
-            }
-        } finally {
-            if (iterator != null) {
-                iterator.close();
-            }
-            if (compactFileWriter != null) {
-                compactFileWriter.close();
-            }
-            if (changelogFileWriter != null) {
-                changelogFileWriter.close();
-            }
-        }
-
-        return new CompactResult(
-                extractFilesFromSections(sections),
-                compactFileWriter.result(),
-                changelogFileWriter.result());
+    @Override
+    protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
+        return outputLevel == maxLevel;
     }
 
     @Override
-    public CompactResult upgrade(int outputLevel, DataFileMeta file) throws 
Exception {
-        if (outputLevel == maxLevel) {
-            return rewriteFullCompaction(
-                    Collections.singletonList(
-                            
Collections.singletonList(SortedRun.fromSingle(file))));
-        } else {
-            return super.upgrade(outputLevel, file);
-        }
+    protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
+        return new FullChangelogMergeFunctionWrapper(mfFactory.create(), 
maxLevel);
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
index 2e6f23f7..1c8c8aef 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
@@ -33,8 +33,8 @@ public class ReducerMergeFunctionWrapper implements 
MergeFunctionWrapper<KeyValu
 
     private final MergeFunction<KeyValue> mergeFunction;
 
-    private transient KeyValue initialKv;
-    private transient boolean isInitialized;
+    private KeyValue initialKv;
+    private boolean isInitialized;
 
     public ReducerMergeFunctionWrapper(MergeFunction<KeyValue> mergeFunction) {
         this.mergeFunction = mergeFunction;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 2db95384..df9c7f3b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.types.RowType;
+import org.apache.flink.table.store.utils.Filter;
 import org.apache.flink.table.store.utils.Preconditions;
 import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
 
@@ -71,7 +72,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Integer specifiedBucket = null;
     private List<ManifestFileMeta> specifiedManifests = null;
     private ScanKind scanKind = ScanKind.ALL;
-    private Integer specifiedLevel = null;
+    private Filter<Integer> levelFilter = null;
 
     public AbstractFileStoreScan(
             RowType partitionType,
@@ -165,8 +166,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     @Override
-    public FileStoreScan withLevel(int level) {
-        this.specifiedLevel = level;
+    public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
+        this.levelFilter = levelFilter;
         return this;
     }
 
@@ -320,7 +321,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     /** Note: Keep this thread-safe. */
     private boolean filterByLevel(ManifestEntry entry) {
-        return (specifiedLevel == null || entry.file().level() == 
specifiedLevel);
+        return (levelFilter == null || levelFilter.test(entry.file().level()));
     }
 
     /** Note: Keep this thread-safe. */
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 603d90b1..cad750d0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.utils.Filter;
 
 import javax.annotation.Nullable;
 
@@ -48,7 +49,7 @@ public interface FileStoreScan {
 
     FileStoreScan withKind(ScanKind scanKind);
 
-    FileStoreScan withLevel(int level);
+    FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
     /** Produce a {@link Plan}. */
     Plan plan();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 72199dbf..7beea73d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -52,8 +52,6 @@ public interface TableWrite extends AutoCloseable {
      * <p>NOTE: In Java API, full compaction is not automatically executed. If 
you set
      * 'changelog-producer' to 'full-compaction', please execute this method 
regularly to produce
      * changelog.
-     *
-     * <p>NOTE: this method will block until the completion of the compaction 
execution.
      */
     void compact(BinaryRow partition, int bucket, boolean fullCompaction) 
throws Exception;
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
index a950a320..8a144546 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.fs.FileIO;
+import org.apache.flink.table.store.utils.Filter;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -111,8 +112,8 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
     }
 
     @Override
-    public AbstractDataTableScan withLevel(int level) {
-        scan.withLevel(level);
+    public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+        scan.withLevelFilter(levelFilter);
         return this;
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 6a5448f4..971aaf60 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table.source;
 import org.apache.flink.table.store.annotation.VisibleForTesting;
 import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.utils.Filter;
 
 import javax.annotation.Nullable;
 
@@ -33,7 +34,7 @@ public interface DataTableScan extends InnerTableScan {
 
     DataTableScan withSnapshot(long snapshotId);
 
-    DataTableScan withLevel(int level);
+    DataTableScan withLevelFilter(Filter<Integer> levelFilter);
 
     DataTableScan withFilter(Predicate predicate);
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index ebe84053..f4dadac7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -183,17 +183,21 @@ public class ContinuousDataFileSnapshotEnumerator 
implements SnapshotEnumerator
     private static FollowUpScanner createFollowUpScanner(DataTable table, 
DataTableScan scan) {
         CoreOptions.ChangelogProducer changelogProducer = 
table.options().changelogProducer();
         FollowUpScanner followUpScanner;
-        if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
-            followUpScanner = new DeltaFollowUpScanner();
-        } else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
-            followUpScanner = new InputChangelogFollowUpScanner();
-        } else if (changelogProducer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
-            // this change in scan will affect both starting scanner and 
follow-up scanner
-            scan.withLevel(table.options().numLevels() - 1);
-            followUpScanner = new CompactionChangelogFollowUpScanner();
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unknown changelog producer " + changelogProducer.name());
+        switch (changelogProducer) {
+            case NONE:
+                followUpScanner = new DeltaFollowUpScanner();
+                break;
+            case INPUT:
+                followUpScanner = new InputChangelogFollowUpScanner();
+                break;
+            case FULL_COMPACTION:
+                // this change in scan will affect both starting scanner and 
follow-up scanner
+                scan.withLevelFilter(level -> level == 
table.options().numLevels() - 1);
+                followUpScanner = new CompactionChangelogFollowUpScanner();
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown changelog producer " + 
changelogProducer.name());
         }
 
         Long boundedWatermark = table.options().scanBoundedWatermark();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
index c1883e2a..d78272ca 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.store.types.DataField;
 import org.apache.flink.table.store.types.RowKind;
 import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.types.VarCharType;
+import org.apache.flink.table.store.utils.Filter;
 import org.apache.flink.table.store.utils.ProjectedRow;
 
 import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
@@ -171,8 +172,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public DataTableScan withLevel(int level) {
-            dataScan.withLevel(level);
+        public DataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+            dataScan.withLevelFilter(levelFilter);
             return this;
         }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
index fc3fa01b..ea8e4f22 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.types.RowKind;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -74,8 +75,13 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
                             new KeyValue()
                                     .replace(row(7), 10, RowKind.INSERT, 
row(3))
                                     .setLevel(MAX_LEVEL),
+                            new KeyValue().replace(row(7), 11, RowKind.DELETE, 
row(3)).setLevel(0)),
+                    Arrays.asList(
+                            new KeyValue()
+                                    .replace(row(7), 12, RowKind.INSERT, 
row(3))
+                                    .setLevel(MAX_LEVEL),
                             new KeyValue()
-                                    .replace(row(7), 11, RowKind.DELETE, 
row(3))
+                                    .replace(row(7), 13, 
RowKind.UPDATE_BEFORE, row(3))
                                     .setLevel(0)));
 
     protected abstract KeyValue getExpectedBefore(int idx);
@@ -90,9 +96,15 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
             wrapper.reset();
             List<KeyValue> kvs = INPUT_KVS.get(i);
             kvs.forEach(kv -> wrapper.add(kv));
-            FullChangelogMergeFunctionWrapper.Result actualResult = 
wrapper.getResult();
-            MergeFunctionTestUtils.assertKvEquals(getExpectedBefore(i), 
actualResult.before());
-            MergeFunctionTestUtils.assertKvEquals(getExpectedAfter(i), 
actualResult.after());
+            ChangelogResult actualResult = wrapper.getResult();
+            List<KeyValue> expectedChangelogs = new ArrayList<>();
+            if (getExpectedBefore(i) != null) {
+                expectedChangelogs.add(getExpectedBefore(i));
+            }
+            if (getExpectedAfter(i) != null) {
+                expectedChangelogs.add(getExpectedAfter(i));
+            }
+            MergeFunctionTestUtils.assertKvsEquals(expectedChangelogs, 
actualResult.changelogs());
             MergeFunctionTestUtils.assertKvEquals(getExpectedResult(i), 
actualResult.result());
         }
     }
@@ -111,7 +123,8 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
                         null,
                         null,
                         new KeyValue().replace(row(6), 8, 
RowKind.UPDATE_BEFORE, row(3)),
-                        new KeyValue().replace(row(7), 10, RowKind.DELETE, 
row(3)));
+                        new KeyValue().replace(row(7), 10, RowKind.DELETE, 
row(3)),
+                        new KeyValue().replace(row(7), 12, RowKind.DELETE, 
row(3)));
 
         private static final List<KeyValue> EXPECTED_AFTER =
                 Arrays.asList(
@@ -121,6 +134,7 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
                         new KeyValue().replace(row(4), 5, RowKind.INSERT, 
row(-3)),
                         null,
                         new KeyValue().replace(row(6), 9, 
RowKind.UPDATE_AFTER, row(-3)),
+                        null,
                         null);
 
         private static final List<KeyValue> EXPECTED_RESULT =
@@ -131,6 +145,7 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
                         new KeyValue().replace(row(4), 5, RowKind.INSERT, 
row(-3)),
                         null,
                         new KeyValue().replace(row(6), 9, RowKind.INSERT, 
row(-3)),
+                        null,
                         null);
 
         @Override
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
index c76d3561..a62cce6a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
@@ -69,6 +69,13 @@ public class MergeFunctionTestUtils {
         return expected;
     }
 
+    public static void assertKvsEquals(List<KeyValue> expected, List<KeyValue> 
actual) {
+        assertThat(actual).hasSize(expected.size());
+        for (int i = 0; i < actual.size(); i++) {
+            assertKvEquals(expected.get(i), actual.get(i));
+        }
+    }
+
     public static void assertKvEquals(KeyValue expected, KeyValue actual) {
         if (expected == null) {
             assertThat(actual).isNull();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index a5f50027..e5bea887 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -65,7 +65,8 @@ public class CompactionChangelogFollowUpScannerTest extends 
SnapshotEnumeratorTe
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
 
-        DataTableScan scan = 
table.newScan().withLevel(table.options().numLevels() - 1);
+        DataTableScan scan =
+                table.newScan().withLevelFilter(level -> level == 
table.options().numLevels() - 1);
         TableRead read = table.newRead();
         CompactionChangelogFollowUpScanner scanner = new 
CompactionChangelogFollowUpScanner();
 
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 66dae689..d1424aed 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -69,7 +69,7 @@ public class FullChangelogStoreSinkWrite extends 
StoreSinkWriteImpl {
     private final NavigableMap<Long, Long> firstWriteMs;
     private final ListState<Tuple2<Long, Long>> firstWriteMsState;
 
-    private transient TreeSet<Long> commitIdentifiersToCheck;
+    private final TreeSet<Long> commitIdentifiersToCheck;
 
     public FullChangelogStoreSinkWrite(
             FileStoreTable table,


Reply via email to