This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c3e24fa4 [FLINK-31392] Refactor classes code of full-compaction
c3e24fa4 is described below
commit c3e24fa4b0fd052a227296b03f9dfb0521ad833e
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Mar 11 16:57:32 2023 +0800
[FLINK-31392] Refactor classes code of full-compaction
This closes #589
---
.../org/apache/flink/table/store/utils/Filter.java | 37 +++++++
.../flink/table/store/utils/MurmurHashUtils.java | 11 +++
.../org/apache/flink/table/store/CoreOptions.java | 8 --
.../store/file/mergetree/MergeTreeWriter.java | 2 +-
...writer.java => ChangelogMergeTreeRewriter.java} | 56 +++++------
.../file/mergetree/compact/ChangelogResult.java | 68 +++++++++++++
.../compact/FullChangelogMergeFunctionWrapper.java | 106 ++++-----------------
.../FullChangelogMergeTreeCompactRewriter.java | 88 +++--------------
.../compact/ReducerMergeFunctionWrapper.java | 4 +-
.../file/operation/AbstractFileStoreScan.java | 9 +-
.../table/store/file/operation/FileStoreScan.java | 3 +-
.../flink/table/store/table/sink/TableWrite.java | 2 -
.../store/table/source/AbstractDataTableScan.java | 5 +-
.../table/store/table/source/DataTableScan.java | 3 +-
.../ContinuousDataFileSnapshotEnumerator.java | 26 ++---
.../table/store/table/system/AuditLogTable.java | 5 +-
.../FullChangelogMergeFunctionWrapperTestBase.java | 25 ++++-
.../mergetree/compact/MergeFunctionTestUtils.java | 7 ++
.../CompactionChangelogFollowUpScannerTest.java | 3 +-
.../sink/FullChangelogStoreSinkWrite.java | 2 +-
20 files changed, 236 insertions(+), 234 deletions(-)
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/Filter.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/Filter.java
new file mode 100644
index 00000000..73826b65
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/Filter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import org.apache.flink.table.store.file.predicate.Predicate;
+
+/**
+ * Represents a filter (boolean-valued function) of one argument. This class
is for avoiding name
+ * conflicting to {@link Predicate}.
+ */
+@FunctionalInterface
+public interface Filter<T> {
+
+ /**
+ * Evaluates this predicate on the given argument.
+ *
+ * @param t the input argument
+ * @return {@code true} if the input argument matches the predicate,
otherwise {@code false}
+ */
+ boolean test(T t);
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
index 03e3603f..53e672d3 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MurmurHashUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.utils;
import org.apache.flink.table.store.memory.MemorySegment;
+import static
org.apache.flink.table.store.memory.MemorySegment.BYTE_ARRAY_BASE_OFFSET;
import static org.apache.flink.table.store.memory.MemorySegment.UNSAFE;
/** Murmur Hash. This is inspired by Guava's Murmur3_32HashFunction. */
@@ -45,6 +46,16 @@ public final class MurmurHashUtils {
return hashUnsafeBytesByWords(base, offset, lengthInBytes,
DEFAULT_SEED);
}
+ /** Hash bytes. */
+ public static int hashBytesPositive(byte[] bytes) {
+ return hashBytes(bytes) & 0x7fffffff;
+ }
+
+ /** Hash bytes. */
+ public static int hashBytes(byte[] bytes) {
+ return hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET, bytes.length,
DEFAULT_SEED);
+ }
+
/**
* Hash unsafe bytes.
*
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 8cb87388..89395df8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -546,14 +546,6 @@ public class CoreOptions implements Serializable {
return options.get(PARTITION_DEFAULT_NAME);
}
- public String orcBloomFilterColumns() {
- return options.get(ORC_BLOOM_FILTER_COLUMNS);
- }
-
- public double orcBloomFilterFpp() {
- return options.get(ORC_BLOOM_FILTER_FPP);
- }
-
public Map<Integer, String> fileCompressionPerLevel() {
Map<String, String> levelCompressions =
options.get(FILE_COMPRESSION_PER_LEVEL);
return levelCompressions.entrySet().stream()
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 93f735d0..b159b23e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -206,7 +206,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
- flushWriteBuffer(false, false);
+ flushWriteBuffer(waitCompaction, false);
trySyncLatestCompaction(waitCompaction || commitForceCompact);
return drainIncrement();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogMergeTreeRewriter.java
similarity index 72%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogMergeTreeRewriter.java
index d4017095..f4d4f2ed 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -29,44 +29,43 @@ import
org.apache.flink.table.store.file.mergetree.MergeTreeReaders;
import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.reader.RecordReader;
import org.apache.flink.table.store.reader.RecordReaderIterator;
-import org.apache.flink.table.store.utils.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-/** A {@link MergeTreeCompactRewriter} which produces changelog files for each
full compaction. */
-public class FullChangelogMergeTreeCompactRewriter extends
MergeTreeCompactRewriter {
+/** A {@link MergeTreeCompactRewriter} which produces changelog files for the
compaction. */
+public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewriter {
- private final int maxLevel;
-
- public FullChangelogMergeTreeCompactRewriter(
- int maxLevel,
+ public ChangelogMergeTreeRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory) {
super(readerFactory, writerFactory, keyComparator, mfFactory);
- this.maxLevel = maxLevel;
}
+ protected abstract boolean rewriteChangelog(
+ int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections);
+
+ protected abstract boolean upgradeChangelog(int outputLevel, DataFileMeta
file);
+
+ protected abstract MergeFunctionWrapper<ChangelogResult>
createMergeWrapper(int outputLevel);
+
@Override
public CompactResult rewrite(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) throws Exception {
- if (outputLevel == maxLevel) {
- Preconditions.checkArgument(
- dropDelete,
- "Delete records should be dropped from result of full
compaction. This is unexpected.");
- return rewriteFullCompaction(sections);
+ if (rewriteChangelog(outputLevel, dropDelete, sections)) {
+ return rewriteChangelogCompaction(outputLevel, sections);
} else {
return rewriteCompaction(outputLevel, dropDelete, sections);
}
}
- private CompactResult rewriteFullCompaction(List<List<SortedRun>>
sections) throws Exception {
-
List<ConcatRecordReader.ReaderSupplier<FullChangelogMergeFunctionWrapper.Result>>
- sectionReaders = new ArrayList<>();
+ private CompactResult rewriteChangelogCompaction(
+ int outputLevel, List<List<SortedRun>> sections) throws Exception {
+ List<ConcatRecordReader.ReaderSupplier<ChangelogResult>>
sectionReaders = new ArrayList<>();
for (List<SortedRun> section : sections) {
sectionReaders.add(
() -> {
@@ -75,32 +74,26 @@ public class FullChangelogMergeTreeCompactRewriter extends
MergeTreeCompactRewri
runReaders.add(MergeTreeReaders.readerForRun(run,
readerFactory));
}
return new SortMergeReader<>(
- runReaders,
- keyComparator,
- new FullChangelogMergeFunctionWrapper(
- mfFactory.create(), maxLevel));
+ runReaders, keyComparator,
createMergeWrapper(outputLevel));
});
}
- RecordReaderIterator<FullChangelogMergeFunctionWrapper.Result>
iterator = null;
+ RecordReaderIterator<ChangelogResult> iterator = null;
RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
try {
iterator = new
RecordReaderIterator<>(ConcatRecordReader.create(sectionReaders));
- compactFileWriter =
writerFactory.createRollingMergeTreeFileWriter(maxLevel);
- changelogFileWriter =
writerFactory.createRollingChangelogFileWriter(maxLevel);
+ compactFileWriter =
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
+ changelogFileWriter =
writerFactory.createRollingChangelogFileWriter(outputLevel);
while (iterator.hasNext()) {
- FullChangelogMergeFunctionWrapper.Result result =
iterator.next();
+ ChangelogResult result = iterator.next();
if (result.result() != null) {
compactFileWriter.write(result.result());
}
- if (result.before() != null) {
- changelogFileWriter.write(result.before());
- }
- if (result.after() != null) {
- changelogFileWriter.write(result.after());
+ for (KeyValue kv : result.changelogs()) {
+ changelogFileWriter.write(kv);
}
}
} finally {
@@ -123,8 +116,9 @@ public class FullChangelogMergeTreeCompactRewriter extends
MergeTreeCompactRewri
@Override
public CompactResult upgrade(int outputLevel, DataFileMeta file) throws
Exception {
- if (outputLevel == maxLevel) {
- return rewriteFullCompaction(
+ if (upgradeChangelog(outputLevel, file)) {
+ return rewriteChangelogCompaction(
+ outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))));
} else {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogResult.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogResult.java
new file mode 100644
index 00000000..a4414e7e
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ChangelogResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Changelog and final result for the same key. */
+public class ChangelogResult {
+
+ private final List<KeyValue> changelogs = new ArrayList<>();
+ @Nullable private KeyValue result;
+
+ public void reset() {
+ changelogs.clear();
+ result = null;
+ }
+
+ public ChangelogResult addChangelog(KeyValue record) {
+ changelogs.add(record);
+ return this;
+ }
+
+ public ChangelogResult setResultIfNotRetract(@Nullable KeyValue result) {
+ if (result != null
+ && result.valueKind() != RowKind.DELETE
+ && result.valueKind() != RowKind.UPDATE_BEFORE) {
+ setResult(result);
+ }
+ return this;
+ }
+
+ public ChangelogResult setResult(@Nullable KeyValue result) {
+ this.result = result;
+ return this;
+ }
+
+ public List<KeyValue> changelogs() {
+ return changelogs;
+ }
+
+ /** Latest result (result of merge function) for this key. */
+ @Nullable
+ public KeyValue result() {
+ return result;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
index c01d0857..699b2724 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapper.java
@@ -22,8 +22,6 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.types.RowKind;
import org.apache.flink.table.store.utils.Preconditions;
-import javax.annotation.Nullable;
-
/**
* Wrapper for {@link MergeFunction}s to produce changelog during a full
compaction.
*
@@ -36,20 +34,19 @@ import javax.annotation.Nullable;
* SortMergeReader}, so there is no issue related to object reuse.
* </ul>
*/
-public class FullChangelogMergeFunctionWrapper
- implements
MergeFunctionWrapper<FullChangelogMergeFunctionWrapper.Result> {
+public class FullChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<ChangelogResult> {
private final MergeFunction<KeyValue> mergeFunction;
private final int maxLevel;
// only full compaction will write files into maxLevel, see
UniversalCompaction class
- private transient KeyValue topLevelKv;
- private transient KeyValue initialKv;
- private transient boolean isInitialized;
+ private KeyValue topLevelKv;
+ private KeyValue initialKv;
+ private boolean isInitialized;
- private transient Result reusedResult;
- private transient KeyValue reusedBefore;
- private transient KeyValue reusedAfter;
+ private final ChangelogResult reusedResult = new ChangelogResult();
+ private final KeyValue reusedBefore = new KeyValue();
+ private final KeyValue reusedAfter = new KeyValue();
public FullChangelogMergeFunctionWrapper(MergeFunction<KeyValue>
mergeFunction, int maxLevel) {
Preconditions.checkArgument(
@@ -93,43 +90,33 @@ public class FullChangelogMergeFunctionWrapper
}
@Override
- public Result getResult() {
- if (reusedResult == null) {
- reusedResult = new Result();
- reusedBefore = new KeyValue();
- reusedAfter = new KeyValue();
- }
-
+ public ChangelogResult getResult() {
+ reusedResult.reset();
if (isInitialized) {
KeyValue merged = mergeFunction.getResult();
if (topLevelKv == null) {
if (merged != null && isAdd(merged)) {
- reusedResult.setChangelog(null, replace(reusedAfter,
RowKind.INSERT, merged));
- } else {
- reusedResult.setChangelog(null, null);
+ reusedResult.addChangelog(replace(reusedAfter,
RowKind.INSERT, merged));
}
} else {
if (merged != null && isAdd(merged)) {
- reusedResult.setChangelog(
- replace(reusedBefore, RowKind.UPDATE_BEFORE,
topLevelKv),
- replace(reusedAfter, RowKind.UPDATE_AFTER,
merged));
+ reusedResult
+ .addChangelog(replace(reusedBefore,
RowKind.UPDATE_BEFORE, topLevelKv))
+ .addChangelog(replace(reusedAfter,
RowKind.UPDATE_AFTER, merged));
} else {
- reusedResult.setChangelog(
- replace(reusedBefore, RowKind.DELETE, topLevelKv),
null);
+ reusedResult.addChangelog(replace(reusedBefore,
RowKind.DELETE, topLevelKv));
}
}
- return reusedResult.setResult(merged);
+ return reusedResult.setResultIfNotRetract(merged);
} else {
if (topLevelKv == null && isAdd(initialKv)) {
- reusedResult.setChangelog(null, replace(reusedAfter,
RowKind.INSERT, initialKv));
- } else {
- // either topLevelKv is not null, but there is only one kv,
- // so topLevelKv must be the only kv, which means there is no
change
- //
- // or initialKv is not an ADD kv, so no new key is added
- reusedResult.setChangelog(null, null);
+ reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT,
initialKv));
}
- return reusedResult.setResult(initialKv);
+ // either topLevelKv is not null, but there is only one kv,
+ // so topLevelKv must be the only kv, which means there is no
change
+ //
+ // or initialKv is not an ADD kv, so no new key is added
+ return reusedResult.setResultIfNotRetract(initialKv);
}
}
@@ -140,55 +127,4 @@ public class FullChangelogMergeFunctionWrapper
private boolean isAdd(KeyValue kv) {
return kv.valueKind() == RowKind.INSERT || kv.valueKind() ==
RowKind.UPDATE_AFTER;
}
-
- /** Changelog and final result for the same key. */
- public static class Result {
-
- @Nullable private KeyValue before;
- @Nullable private KeyValue after;
- @Nullable private KeyValue result;
-
- private Result() {}
-
- private void setChangelog(@Nullable KeyValue before, @Nullable
KeyValue after) {
- this.before = before;
- this.after = after;
- }
-
- private Result setResult(@Nullable KeyValue result) {
- if (result != null && result.valueKind() != RowKind.DELETE) {
- this.result = result;
- } else {
- this.result = null;
- }
- return this;
- }
-
- /**
- * Previous full compaction result for this key. Null if this key does
not appear in the
- * previous result or its value remains unchanged.
- */
- @Nullable
- public KeyValue before() {
- return before;
- }
-
- /**
- * Latest full compaction result for this key. Null if this key does
not appear in the
- * latest result or its value remains unchanged.
- */
- @Nullable
- public KeyValue after() {
- return after;
- }
-
- /**
- * Latest full compaction result (result of merge function) for this
key. Null if the merged
- * result is null or is of DELETE kind.
- */
- @Nullable
- public KeyValue result() {
- return result;
- }
- }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index d4017095..6fc064c4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -20,24 +20,17 @@ package org.apache.flink.table.store.file.mergetree.compact;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
-import org.apache.flink.table.store.file.io.RollingFileWriter;
-import org.apache.flink.table.store.file.mergetree.MergeTreeReaders;
import org.apache.flink.table.store.file.mergetree.SortedRun;
-import org.apache.flink.table.store.reader.RecordReader;
-import org.apache.flink.table.store.reader.RecordReaderIterator;
import org.apache.flink.table.store.utils.Preconditions;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/** A {@link MergeTreeCompactRewriter} which produces changelog files for each
full compaction. */
-public class FullChangelogMergeTreeCompactRewriter extends
MergeTreeCompactRewriter {
+public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
private final int maxLevel;
@@ -52,83 +45,24 @@ public class FullChangelogMergeTreeCompactRewriter extends
MergeTreeCompactRewri
}
@Override
- public CompactResult rewrite(
- int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) throws Exception {
- if (outputLevel == maxLevel) {
+ protected boolean rewriteChangelog(
+ int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) {
+ boolean changelog = outputLevel == maxLevel;
+ if (changelog) {
Preconditions.checkArgument(
dropDelete,
"Delete records should be dropped from result of full
compaction. This is unexpected.");
- return rewriteFullCompaction(sections);
- } else {
- return rewriteCompaction(outputLevel, dropDelete, sections);
}
+ return changelog;
}
- private CompactResult rewriteFullCompaction(List<List<SortedRun>>
sections) throws Exception {
-
List<ConcatRecordReader.ReaderSupplier<FullChangelogMergeFunctionWrapper.Result>>
- sectionReaders = new ArrayList<>();
- for (List<SortedRun> section : sections) {
- sectionReaders.add(
- () -> {
- List<RecordReader<KeyValue>> runReaders = new
ArrayList<>();
- for (SortedRun run : section) {
- runReaders.add(MergeTreeReaders.readerForRun(run,
readerFactory));
- }
- return new SortMergeReader<>(
- runReaders,
- keyComparator,
- new FullChangelogMergeFunctionWrapper(
- mfFactory.create(), maxLevel));
- });
- }
-
- RecordReaderIterator<FullChangelogMergeFunctionWrapper.Result>
iterator = null;
- RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
- RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
-
- try {
- iterator = new
RecordReaderIterator<>(ConcatRecordReader.create(sectionReaders));
- compactFileWriter =
writerFactory.createRollingMergeTreeFileWriter(maxLevel);
- changelogFileWriter =
writerFactory.createRollingChangelogFileWriter(maxLevel);
-
- while (iterator.hasNext()) {
- FullChangelogMergeFunctionWrapper.Result result =
iterator.next();
- if (result.result() != null) {
- compactFileWriter.write(result.result());
- }
- if (result.before() != null) {
- changelogFileWriter.write(result.before());
- }
- if (result.after() != null) {
- changelogFileWriter.write(result.after());
- }
- }
- } finally {
- if (iterator != null) {
- iterator.close();
- }
- if (compactFileWriter != null) {
- compactFileWriter.close();
- }
- if (changelogFileWriter != null) {
- changelogFileWriter.close();
- }
- }
-
- return new CompactResult(
- extractFilesFromSections(sections),
- compactFileWriter.result(),
- changelogFileWriter.result());
+ @Override
+ protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
+ return outputLevel == maxLevel;
}
@Override
- public CompactResult upgrade(int outputLevel, DataFileMeta file) throws
Exception {
- if (outputLevel == maxLevel) {
- return rewriteFullCompaction(
- Collections.singletonList(
-
Collections.singletonList(SortedRun.fromSingle(file))));
- } else {
- return super.upgrade(outputLevel, file);
- }
+ protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int
outputLevel) {
+ return new FullChangelogMergeFunctionWrapper(mfFactory.create(),
maxLevel);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
index 2e6f23f7..1c8c8aef 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ReducerMergeFunctionWrapper.java
@@ -33,8 +33,8 @@ public class ReducerMergeFunctionWrapper implements
MergeFunctionWrapper<KeyValu
private final MergeFunction<KeyValue> mergeFunction;
- private transient KeyValue initialKv;
- private transient boolean isInitialized;
+ private KeyValue initialKv;
+ private boolean isInitialized;
public ReducerMergeFunctionWrapper(MergeFunction<KeyValue> mergeFunction) {
this.mergeFunction = mergeFunction;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 2db95384..df9c7f3b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -34,6 +34,7 @@ import
org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.types.RowType;
+import org.apache.flink.table.store.utils.Filter;
import org.apache.flink.table.store.utils.Preconditions;
import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
@@ -71,7 +72,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Integer specifiedBucket = null;
private List<ManifestFileMeta> specifiedManifests = null;
private ScanKind scanKind = ScanKind.ALL;
- private Integer specifiedLevel = null;
+ private Filter<Integer> levelFilter = null;
public AbstractFileStoreScan(
RowType partitionType,
@@ -165,8 +166,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
@Override
- public FileStoreScan withLevel(int level) {
- this.specifiedLevel = level;
+ public FileStoreScan withLevelFilter(Filter<Integer> levelFilter) {
+ this.levelFilter = levelFilter;
return this;
}
@@ -320,7 +321,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
private boolean filterByLevel(ManifestEntry entry) {
- return (specifiedLevel == null || entry.file().level() ==
specifiedLevel);
+ return (levelFilter == null || levelFilter.test(entry.file().level()));
}
/** Note: Keep this thread-safe. */
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 603d90b1..cad750d0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.utils.Filter;
import javax.annotation.Nullable;
@@ -48,7 +49,7 @@ public interface FileStoreScan {
FileStoreScan withKind(ScanKind scanKind);
- FileStoreScan withLevel(int level);
+ FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
/** Produce a {@link Plan}. */
Plan plan();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 72199dbf..7beea73d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -52,8 +52,6 @@ public interface TableWrite extends AutoCloseable {
* <p>NOTE: In Java API, full compaction is not automatically executed. If
you set
* 'changelog-producer' to 'full-compaction', please execute this method
regularly to produce
* changelog.
- *
- * <p>NOTE: this method will block until the completion of the compaction
execution.
*/
void compact(BinaryRow partition, int bucket, boolean fullCompaction)
throws Exception;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
index a950a320..8a144546 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.fs.FileIO;
+import org.apache.flink.table.store.utils.Filter;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -111,8 +112,8 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
}
@Override
- public AbstractDataTableScan withLevel(int level) {
- scan.withLevel(level);
+ public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+ scan.withLevelFilter(levelFilter);
return this;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 6a5448f4..971aaf60 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table.source;
import org.apache.flink.table.store.annotation.VisibleForTesting;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.utils.Filter;
import javax.annotation.Nullable;
@@ -33,7 +34,7 @@ public interface DataTableScan extends InnerTableScan {
DataTableScan withSnapshot(long snapshotId);
- DataTableScan withLevel(int level);
+ DataTableScan withLevelFilter(Filter<Integer> levelFilter);
DataTableScan withFilter(Predicate predicate);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index ebe84053..f4dadac7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -183,17 +183,21 @@ public class ContinuousDataFileSnapshotEnumerator
implements SnapshotEnumerator
private static FollowUpScanner createFollowUpScanner(DataTable table,
DataTableScan scan) {
CoreOptions.ChangelogProducer changelogProducer =
table.options().changelogProducer();
FollowUpScanner followUpScanner;
- if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
- followUpScanner = new DeltaFollowUpScanner();
- } else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
- followUpScanner = new InputChangelogFollowUpScanner();
- } else if (changelogProducer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
- // this change in scan will affect both starting scanner and
follow-up scanner
- scan.withLevel(table.options().numLevels() - 1);
- followUpScanner = new CompactionChangelogFollowUpScanner();
- } else {
- throw new UnsupportedOperationException(
- "Unknown changelog producer " + changelogProducer.name());
+ switch (changelogProducer) {
+ case NONE:
+ followUpScanner = new DeltaFollowUpScanner();
+ break;
+ case INPUT:
+ followUpScanner = new InputChangelogFollowUpScanner();
+ break;
+ case FULL_COMPACTION:
+ // this change in scan will affect both starting scanner and
follow-up scanner
+ scan.withLevelFilter(level -> level ==
table.options().numLevels() - 1);
+ followUpScanner = new CompactionChangelogFollowUpScanner();
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown changelog producer " +
changelogProducer.name());
}
Long boundedWatermark = table.options().scanBoundedWatermark();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
index c1883e2a..d78272ca 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.store.types.DataField;
import org.apache.flink.table.store.types.RowKind;
import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.types.VarCharType;
+import org.apache.flink.table.store.utils.Filter;
import org.apache.flink.table.store.utils.ProjectedRow;
import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
@@ -171,8 +172,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public DataTableScan withLevel(int level) {
- dataScan.withLevel(level);
+ public DataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+ dataScan.withLevelFilter(levelFilter);
return this;
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
index fc3fa01b..ea8e4f22 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.types.RowKind;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -74,8 +75,13 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
new KeyValue()
.replace(row(7), 10, RowKind.INSERT,
row(3))
.setLevel(MAX_LEVEL),
+ new KeyValue().replace(row(7), 11, RowKind.DELETE,
row(3)).setLevel(0)),
+ Arrays.asList(
+ new KeyValue()
+ .replace(row(7), 12, RowKind.INSERT,
row(3))
+ .setLevel(MAX_LEVEL),
new KeyValue()
- .replace(row(7), 11, RowKind.DELETE,
row(3))
+ .replace(row(7), 13,
RowKind.UPDATE_BEFORE, row(3))
.setLevel(0)));
protected abstract KeyValue getExpectedBefore(int idx);
@@ -90,9 +96,15 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
wrapper.reset();
List<KeyValue> kvs = INPUT_KVS.get(i);
kvs.forEach(kv -> wrapper.add(kv));
- FullChangelogMergeFunctionWrapper.Result actualResult =
wrapper.getResult();
- MergeFunctionTestUtils.assertKvEquals(getExpectedBefore(i),
actualResult.before());
- MergeFunctionTestUtils.assertKvEquals(getExpectedAfter(i),
actualResult.after());
+ ChangelogResult actualResult = wrapper.getResult();
+ List<KeyValue> expectedChangelogs = new ArrayList<>();
+ if (getExpectedBefore(i) != null) {
+ expectedChangelogs.add(getExpectedBefore(i));
+ }
+ if (getExpectedAfter(i) != null) {
+ expectedChangelogs.add(getExpectedAfter(i));
+ }
+ MergeFunctionTestUtils.assertKvsEquals(expectedChangelogs,
actualResult.changelogs());
MergeFunctionTestUtils.assertKvEquals(getExpectedResult(i),
actualResult.result());
}
}
@@ -111,7 +123,8 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
null,
null,
new KeyValue().replace(row(6), 8,
RowKind.UPDATE_BEFORE, row(3)),
- new KeyValue().replace(row(7), 10, RowKind.DELETE,
row(3)));
+ new KeyValue().replace(row(7), 10, RowKind.DELETE,
row(3)),
+ new KeyValue().replace(row(7), 12, RowKind.DELETE,
row(3)));
private static final List<KeyValue> EXPECTED_AFTER =
Arrays.asList(
@@ -121,6 +134,7 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
new KeyValue().replace(row(4), 5, RowKind.INSERT,
row(-3)),
null,
new KeyValue().replace(row(6), 9,
RowKind.UPDATE_AFTER, row(-3)),
+ null,
null);
private static final List<KeyValue> EXPECTED_RESULT =
@@ -131,6 +145,7 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
new KeyValue().replace(row(4), 5, RowKind.INSERT,
row(-3)),
null,
new KeyValue().replace(row(6), 9, RowKind.INSERT,
row(-3)),
+ null,
null);
@Override
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
index c76d3561..a62cce6a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
@@ -69,6 +69,13 @@ public class MergeFunctionTestUtils {
return expected;
}
+ public static void assertKvsEquals(List<KeyValue> expected, List<KeyValue>
actual) {
+ assertThat(actual).hasSize(expected.size());
+ for (int i = 0; i < actual.size(); i++) {
+ assertKvEquals(expected.get(i), actual.get(i));
+ }
+ }
+
public static void assertKvEquals(KeyValue expected, KeyValue actual) {
if (expected == null) {
assertThat(actual).isNull();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index a5f50027..e5bea887 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -65,7 +65,8 @@ public class CompactionChangelogFollowUpScannerTest extends
SnapshotEnumeratorTe
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
- DataTableScan scan =
table.newScan().withLevel(table.options().numLevels() - 1);
+ DataTableScan scan =
+ table.newScan().withLevelFilter(level -> level ==
table.options().numLevels() - 1);
TableRead read = table.newRead();
CompactionChangelogFollowUpScanner scanner = new
CompactionChangelogFollowUpScanner();
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 66dae689..d1424aed 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -69,7 +69,7 @@ public class FullChangelogStoreSinkWrite extends
StoreSinkWriteImpl {
private final NavigableMap<Long, Long> firstWriteMs;
private final ListState<Tuple2<Long, Long>> firstWriteMsState;
- private transient TreeSet<Long> commitIdentifiersToCheck;
+ private final TreeSet<Long> commitIdentifiersToCheck;
public FullChangelogStoreSinkWrite(
FileStoreTable table,