This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2417b38a2b [core] Abstract KvCompactionManagerFactory in
KeyValueFileStoreWrite (#7403)
2417b38a2b is described below
commit 2417b38a2bc974979c1cf6400f2d869d0d355719
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 12 14:14:49 2026 +0800
[core] Abstract KvCompactionManagerFactory in KeyValueFileStoreWrite (#7403)
---
.../compact/KvCompactionManagerFactory.java | 97 +++++++
.../compact/MergeTreeCompactManagerFactory.java} | 243 +++++-----------
.../paimon/operation/KeyValueFileStoreWrite.java | 317 +++------------------
3 files changed, 202 insertions(+), 455 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KvCompactionManagerFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KvCompactionManagerFactory.java
new file mode 100644
index 0000000000..ee2b490459
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KvCompactionManagerFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.codegen.RecordEqualiser;
+import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.io.RecordLevelExpire;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FieldsComparator;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+/** Factory to create a {@link CompactManager} for primary key table. */
+public interface KvCompactionManagerFactory extends Closeable {
+
+ static KvCompactionManagerFactory create(
+ KeyValueFileReaderFactory.Builder readerFactoryBuilder,
+ KeyValueFileWriterFactory.Builder writerFactoryBuilder,
+ Supplier<Comparator<InternalRow>> keyComparatorSupplier,
+ Supplier<FieldsComparator> udsComparatorSupplier,
+ Supplier<RecordEqualiser> logDedupEqualSupplier,
+ MergeFunctionFactory<KeyValue> mfFactory,
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType,
+ RowType partitionType,
+ FileIO fileIO,
+ SchemaManager schemaManager,
+ TableSchema schema,
+ @Nullable RecordLevelExpire recordLevelExpire,
+ CacheManager cacheManager) {
+ return new MergeTreeCompactManagerFactory(
+ readerFactoryBuilder,
+ writerFactoryBuilder,
+ keyComparatorSupplier,
+ udsComparatorSupplier,
+ logDedupEqualSupplier,
+ mfFactory,
+ options,
+ keyType,
+ valueType,
+ partitionType,
+ fileIO,
+ schemaManager,
+ schema,
+ recordLevelExpire,
+ cacheManager);
+ }
+
+ void withIOManager(@Nullable IOManager ioManager);
+
+ void withCompactionMetrics(@Nullable CompactionMetrics compactionMetrics);
+
+ /** Create a {@link CompactManager} for the given partition and bucket. */
+ CompactManager create(
+ BinaryRow partition,
+ int bucket,
+ ExecutorService compactExecutor,
+ List<DataFileMeta> restoreFiles,
+ @Nullable BucketedDvMaintainer dvMaintainer);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
similarity index 67%
copy from
paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
copy to
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
index 326158c7b7..4ad5c0ac79 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
@@ -16,13 +16,12 @@
* limitations under the License.
*/
-package org.apache.paimon.operation;
+package org.apache.paimon.mergetree.compact;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
@@ -32,35 +31,21 @@ import
org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RecordLevelExpire;
+import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupFile;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
-import org.apache.paimon.mergetree.MergeTreeWriter;
-import org.apache.paimon.mergetree.compact.CompactRewriter;
-import org.apache.paimon.mergetree.compact.CompactStrategy;
-import org.apache.paimon.mergetree.compact.EarlyFullCompaction;
-import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
-import
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
-import org.apache.paimon.mergetree.compact.LookupMergeFunction;
-import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
-import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
-import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
-import org.apache.paimon.mergetree.compact.OffPeakHours;
-import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.mergetree.lookup.LookupSerializerFactory;
import org.apache.paimon.mergetree.lookup.PersistEmptyProcessor;
import org.apache.paimon.mergetree.lookup.PersistPositionProcessor;
@@ -68,42 +53,30 @@ import org.apache.paimon.mergetree.lookup.PersistProcessor;
import org.apache.paimon.mergetree.lookup.PersistValueAndPosProcessor;
import org.apache.paimon.mergetree.lookup.PersistValueProcessor;
import org.apache.paimon.mergetree.lookup.RemoteLookupFileManager;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
-import static org.apache.paimon.format.FileFormat.fileFormat;
import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
import static org.apache.paimon.mergetree.LookupFile.localFilePrefix;
-import static
org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories;
-
-/** {@link FileStoreWrite} for {@link KeyValueFileStore}. */
-public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
- private static final Logger LOG =
LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
+/** Factory to create {@link MergeTreeCompactManager}. */
+public class MergeTreeCompactManagerFactory implements
KvCompactionManagerFactory {
private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
@@ -114,129 +87,100 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final CoreOptions options;
private final RowType keyType;
private final RowType valueType;
+ private final RowType partitionType;
private final FileIO fileIO;
private final SchemaManager schemaManager;
private final TableSchema schema;
- private final RowType partitionType;
- private final String commitUser;
@Nullable private final RecordLevelExpire recordLevelExpire;
+ private final CacheManager cacheManager;
+
+ @Nullable private IOManager ioManager;
+ @Nullable private CompactionMetrics compactionMetrics;
@Nullable private Cache<String, LookupFile> lookupFileCache;
- public KeyValueFileStoreWrite(
- FileIO fileIO,
- SchemaManager schemaManager,
- TableSchema schema,
- String commitUser,
- RowType partitionType,
- RowType keyType,
- RowType valueType,
+ public MergeTreeCompactManagerFactory(
+ KeyValueFileReaderFactory.Builder readerFactoryBuilder,
+ KeyValueFileWriterFactory.Builder writerFactoryBuilder,
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
Supplier<FieldsComparator> udsComparatorSupplier,
Supplier<RecordEqualiser> logDedupEqualSupplier,
MergeFunctionFactory<KeyValue> mfFactory,
- FileStorePathFactory pathFactory,
- BiFunction<CoreOptions, String, FileStorePathFactory>
formatPathFactory,
- SnapshotManager snapshotManager,
- FileStoreScan scan,
- @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
- @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
CoreOptions options,
- KeyValueFieldsExtractor extractor,
- String tableName) {
- super(
- snapshotManager,
- scan,
- options,
- partitionType,
- dbMaintainerFactory,
- dvMaintainerFactory,
- tableName);
- this.fileIO = fileIO;
- this.schemaManager = schemaManager;
- this.schema = schema;
- this.partitionType = partitionType;
- this.keyType = keyType;
- this.valueType = valueType;
- this.commitUser = commitUser;
-
- this.udsComparatorSupplier = udsComparatorSupplier;
- this.readerFactoryBuilder =
- KeyValueFileReaderFactory.builder(
- fileIO,
- schemaManager,
- schema,
- keyType,
- valueType,
- FileFormatDiscover.of(options),
- pathFactory,
- extractor,
- options);
- this.recordLevelExpire = RecordLevelExpire.create(options, schema,
schemaManager);
- this.writerFactoryBuilder =
- KeyValueFileWriterFactory.builder(
- fileIO,
- schema.id(),
- keyType,
- valueType,
- fileFormat(options),
- createFormatPathFactories(options, formatPathFactory),
- options.targetFileSize(true));
+ RowType keyType,
+ RowType valueType,
+ RowType partitionType,
+ FileIO fileIO,
+ SchemaManager schemaManager,
+ TableSchema schema,
+ @Nullable RecordLevelExpire recordLevelExpire,
+ CacheManager cacheManager) {
+ this.readerFactoryBuilder = readerFactoryBuilder;
+ this.writerFactoryBuilder = writerFactoryBuilder;
this.keyComparatorSupplier = keyComparatorSupplier;
+ this.udsComparatorSupplier = udsComparatorSupplier;
this.logDedupEqualSupplier = logDedupEqualSupplier;
this.mfFactory = mfFactory;
this.options = options;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ this.partitionType = partitionType;
+ this.fileIO = fileIO;
+ this.schemaManager = schemaManager;
+ this.schema = schema;
+ this.recordLevelExpire = recordLevelExpire;
+ this.cacheManager = cacheManager;
}
@Override
- public KeyValueFileStoreWrite withIOManager(IOManager ioManager) {
- super.withIOManager(ioManager);
- if (mfFactory instanceof LookupMergeFunction.Factory) {
- ((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
- }
- return this;
+ public void withIOManager(@Nullable IOManager ioManager) {
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public void withCompactionMetrics(@Nullable CompactionMetrics
compactionMetrics) {
+ this.compactionMetrics = compactionMetrics;
}
@Override
- protected MergeTreeWriter createWriter(
+ public CompactManager create(
BinaryRow partition,
int bucket,
- List<DataFileMeta> restoreFiles,
- long restoredMaxSeqNumber,
- @Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
+ List<DataFileMeta> restoreFiles,
@Nullable BucketedDvMaintainer dvMaintainer) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Creating merge tree writer for partition {} bucket {}
from restored files {}",
- partition,
- bucket,
- restoreFiles);
+ if (options.writeOnly()) {
+ return new NoopCompactManager();
}
- KeyValueFileWriterFactory writerFactory =
- writerFactoryBuilder.build(partition, bucket, options);
+ CompactStrategy compactStrategy = createCompactStrategy(options);
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
- CompactStrategy compactStrategy = createCompactStrategy(options);
- CompactManager compactManager =
- createCompactManager(
- partition, bucket, compactStrategy, compactExecutor,
levels, dvMaintainer);
-
- return new MergeTreeWriter(
- options.writeBufferSpillable(),
- options.writeBufferSpillDiskSize(),
- options.localSortMaxNumFileHandles(),
- options.spillCompressOptions(),
- ioManager,
- compactManager,
- restoredMaxSeqNumber,
+ @Nullable FieldsComparator userDefinedSeqComparator =
udsComparatorSupplier.get();
+ CompactRewriter rewriter =
+ createRewriter(
+ partition,
+ bucket,
+ keyComparator,
+ userDefinedSeqComparator,
+ levels,
+ dvMaintainer);
+ return new MergeTreeCompactManager(
+ compactExecutor,
+ levels,
+ compactStrategy,
keyComparator,
- mfFactory.create(),
- writerFactory,
- options.commitForceCompact(),
- options.changelogProducer(),
- restoreIncrement,
- UserDefinedSeqComparator.create(valueType, options));
+ options.compactionFileSize(true),
+ options.numSortedRunStopTrigger(),
+ rewriter,
+ compactionMetrics == null
+ ? null
+ : compactionMetrics.createReporter(partition, bucket),
+ dvMaintainer,
+ options.prepareCommitWaitCompaction(),
+ options.needLookup(),
+ recordLevelExpire,
+ options.forceRewriteAllFiles(),
+ options.isChainTable());
}
private CompactStrategy createCompactStrategy(CoreOptions options) {
@@ -273,46 +217,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
}
}
- private CompactManager createCompactManager(
- BinaryRow partition,
- int bucket,
- CompactStrategy compactStrategy,
- ExecutorService compactExecutor,
- Levels levels,
- @Nullable BucketedDvMaintainer dvMaintainer) {
- if (options.writeOnly()) {
- return new NoopCompactManager();
- } else {
- Comparator<InternalRow> keyComparator =
keyComparatorSupplier.get();
- @Nullable FieldsComparator userDefinedSeqComparator =
udsComparatorSupplier.get();
- CompactRewriter rewriter =
- createRewriter(
- partition,
- bucket,
- keyComparator,
- userDefinedSeqComparator,
- levels,
- dvMaintainer);
- return new MergeTreeCompactManager(
- compactExecutor,
- levels,
- compactStrategy,
- keyComparator,
- options.compactionFileSize(true),
- options.numSortedRunStopTrigger(),
- rewriter,
- compactionMetrics == null
- ? null
- : compactionMetrics.createReporter(partition,
bucket),
- dvMaintainer,
- options.prepareCommitWaitCompaction(),
- options.needLookup(),
- recordLevelExpire,
- options.forceRewriteAllFiles(),
- options.isChainTable());
- }
- }
-
private MergeTreeCompactRewriter createRewriter(
BinaryRow partition,
int bucket,
@@ -391,6 +295,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
lookupLevels,
options.lookupRemoteLevelThreshold());
}
+ //noinspection rawtypes,unchecked
return new LookupMergeTreeCompactRewriter(
maxLevel,
mergeEngine,
@@ -459,13 +364,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
}
@Override
- protected Function<WriterContainer<KeyValue>, Boolean>
createWriterCleanChecker() {
- return createConflictAwareWriterCleanChecker(commitUser, restore);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
+ public void close() {
if (lookupFileCache != null) {
lookupFileCache.invalidateAll();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 326158c7b7..41ecc6d0aa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -19,56 +19,26 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactManager;
-import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
-import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RecordLevelExpire;
-import org.apache.paimon.lookup.LookupStoreFactory;
-import org.apache.paimon.lookup.LookupStrategy;
-import org.apache.paimon.mergetree.Levels;
-import org.apache.paimon.mergetree.LookupFile;
-import org.apache.paimon.mergetree.LookupLevels;
-import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
-import org.apache.paimon.mergetree.compact.CompactRewriter;
-import org.apache.paimon.mergetree.compact.CompactStrategy;
-import org.apache.paimon.mergetree.compact.EarlyFullCompaction;
-import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
-import
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
+import org.apache.paimon.mergetree.compact.KvCompactionManagerFactory;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
-import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
-import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
-import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
-import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
-import org.apache.paimon.mergetree.compact.OffPeakHours;
-import org.apache.paimon.mergetree.compact.UniversalCompaction;
-import org.apache.paimon.mergetree.lookup.LookupSerializerFactory;
-import org.apache.paimon.mergetree.lookup.PersistEmptyProcessor;
-import org.apache.paimon.mergetree.lookup.PersistPositionProcessor;
-import org.apache.paimon.mergetree.lookup.PersistProcessor;
-import org.apache.paimon.mergetree.lookup.PersistValueAndPosProcessor;
-import org.apache.paimon.mergetree.lookup.PersistValueProcessor;
-import org.apache.paimon.mergetree.lookup.RemoteLookupFileManager;
-import org.apache.paimon.options.Options;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -79,8 +49,6 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,11 +61,7 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
-import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.format.FileFormat.fileFormat;
-import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
-import static org.apache.paimon.mergetree.LookupFile.localFilePrefix;
import static
org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories;
/** {@link FileStoreWrite} for {@link KeyValueFileStore}. */
@@ -105,22 +69,13 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private static final Logger LOG =
LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
- private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
- private final Supplier<FieldsComparator> udsComparatorSupplier;
- private final Supplier<RecordEqualiser> logDedupEqualSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final CoreOptions options;
- private final RowType keyType;
private final RowType valueType;
- private final FileIO fileIO;
- private final SchemaManager schemaManager;
- private final TableSchema schema;
- private final RowType partitionType;
private final String commitUser;
- @Nullable private final RecordLevelExpire recordLevelExpire;
- @Nullable private Cache<String, LookupFile> lookupFileCache;
+ private final KvCompactionManagerFactory compactManagerFactory;
public KeyValueFileStoreWrite(
FileIO fileIO,
@@ -151,16 +106,10 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
dbMaintainerFactory,
dvMaintainerFactory,
tableName);
- this.fileIO = fileIO;
- this.schemaManager = schemaManager;
- this.schema = schema;
- this.partitionType = partitionType;
- this.keyType = keyType;
this.valueType = valueType;
this.commitUser = commitUser;
- this.udsComparatorSupplier = udsComparatorSupplier;
- this.readerFactoryBuilder =
+ KeyValueFileReaderFactory.Builder readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
fileIO,
schemaManager,
@@ -171,7 +120,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
pathFactory,
extractor,
options);
- this.recordLevelExpire = RecordLevelExpire.create(options, schema,
schemaManager);
+ RecordLevelExpire recordLevelExpire =
+ RecordLevelExpire.create(options, schema, schemaManager);
this.writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
fileIO,
@@ -182,20 +132,44 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
createFormatPathFactories(options, formatPathFactory),
options.targetFileSize(true));
this.keyComparatorSupplier = keyComparatorSupplier;
- this.logDedupEqualSupplier = logDedupEqualSupplier;
this.mfFactory = mfFactory;
this.options = options;
+ this.compactManagerFactory =
+ KvCompactionManagerFactory.create(
+ readerFactoryBuilder,
+ writerFactoryBuilder,
+ keyComparatorSupplier,
+ udsComparatorSupplier,
+ logDedupEqualSupplier,
+ mfFactory,
+ options,
+ keyType,
+ valueType,
+ partitionType,
+ fileIO,
+ schemaManager,
+ schema,
+ recordLevelExpire,
+ cacheManager);
}
@Override
public KeyValueFileStoreWrite withIOManager(IOManager ioManager) {
super.withIOManager(ioManager);
+ compactManagerFactory.withIOManager(ioManager);
if (mfFactory instanceof LookupMergeFunction.Factory) {
((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
}
return this;
}
+ @Override
+ public FileStoreWrite<KeyValue> withMetricRegistry(MetricRegistry
metricRegistry) {
+ super.withMetricRegistry(metricRegistry);
+ compactManagerFactory.withCompactionMetrics(this.compactionMetrics);
+ return this;
+ }
+
@Override
protected MergeTreeWriter createWriter(
BinaryRow partition,
@@ -216,11 +190,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
- Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
- CompactStrategy compactStrategy = createCompactStrategy(options);
CompactManager compactManager =
- createCompactManager(
- partition, bucket, compactStrategy, compactExecutor,
levels, dvMaintainer);
+ compactManagerFactory.create(
+ partition, bucket, compactExecutor, restoreFiles,
dvMaintainer);
return new MergeTreeWriter(
options.writeBufferSpillable(),
@@ -239,225 +211,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
UserDefinedSeqComparator.create(valueType, options));
}
- private CompactStrategy createCompactStrategy(CoreOptions options) {
- if (options.needLookup()) {
- Integer compactMaxInterval = null;
- switch (options.lookupCompact()) {
- case GENTLE:
- compactMaxInterval = options.lookupCompactMaxInterval();
- break;
- case RADICAL:
- break;
- }
- return new ForceUpLevel0Compaction(
- new UniversalCompaction(
- options.maxSizeAmplificationPercent(),
- options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- EarlyFullCompaction.create(options),
- OffPeakHours.create(options)),
- compactMaxInterval);
- }
-
- UniversalCompaction universal =
- new UniversalCompaction(
- options.maxSizeAmplificationPercent(),
- options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- EarlyFullCompaction.create(options),
- OffPeakHours.create(options));
- if (options.compactionForceUpLevel0()) {
- return new ForceUpLevel0Compaction(universal, null);
- } else {
- return universal;
- }
- }
-
- private CompactManager createCompactManager(
- BinaryRow partition,
- int bucket,
- CompactStrategy compactStrategy,
- ExecutorService compactExecutor,
- Levels levels,
- @Nullable BucketedDvMaintainer dvMaintainer) {
- if (options.writeOnly()) {
- return new NoopCompactManager();
- } else {
- Comparator<InternalRow> keyComparator =
keyComparatorSupplier.get();
- @Nullable FieldsComparator userDefinedSeqComparator =
udsComparatorSupplier.get();
- CompactRewriter rewriter =
- createRewriter(
- partition,
- bucket,
- keyComparator,
- userDefinedSeqComparator,
- levels,
- dvMaintainer);
- return new MergeTreeCompactManager(
- compactExecutor,
- levels,
- compactStrategy,
- keyComparator,
- options.compactionFileSize(true),
- options.numSortedRunStopTrigger(),
- rewriter,
- compactionMetrics == null
- ? null
- : compactionMetrics.createReporter(partition,
bucket),
- dvMaintainer,
- options.prepareCommitWaitCompaction(),
- options.needLookup(),
- recordLevelExpire,
- options.forceRewriteAllFiles(),
- options.isChainTable());
- }
- }
-
- private MergeTreeCompactRewriter createRewriter(
- BinaryRow partition,
- int bucket,
- Comparator<InternalRow> keyComparator,
- @Nullable FieldsComparator userDefinedSeqComparator,
- Levels levels,
- @Nullable BucketedDvMaintainer dvMaintainer) {
- DeletionVector.Factory dvFactory =
DeletionVector.factory(dvMaintainer);
- KeyValueFileReaderFactory keyReaderFactory =
- readerFactoryBuilder.build(partition, bucket, dvFactory);
- FileReaderFactory<KeyValue> readerFactory = keyReaderFactory;
- if (recordLevelExpire != null) {
- readerFactory = recordLevelExpire.wrap(readerFactory);
- }
- KeyValueFileWriterFactory writerFactory =
- writerFactoryBuilder.build(partition, bucket, options);
- MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType,
ioManager);
- int maxLevel = options.numLevels() - 1;
- MergeEngine mergeEngine = options.mergeEngine();
- ChangelogProducer changelogProducer = options.changelogProducer();
- LookupStrategy lookupStrategy = options.lookupStrategy();
- if (changelogProducer.equals(FULL_COMPACTION)) {
- return new FullChangelogMergeTreeCompactRewriter(
- maxLevel,
- mergeEngine,
- readerFactory,
- writerFactory,
- keyComparator,
- userDefinedSeqComparator,
- mfFactory,
- mergeSorter,
- logDedupEqualSupplier.get());
- } else if (lookupStrategy.needLookup) {
- PersistProcessor.Factory<?> processorFactory;
- LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?>
wrapperFactory;
- FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
- if (lookupStrategy.isFirstRow) {
- if (options.deletionVectorsEnabled()) {
- throw new UnsupportedOperationException(
- "First row merge engine does not need deletion
vectors because there is no deletion of old data in this merge engine.");
- }
- lookupReaderFactory =
- readerFactoryBuilder
- .copyWithoutProjection()
- .withReadValueType(RowType.of())
- .build(partition, bucket, dvFactory);
- processorFactory = PersistEmptyProcessor.factory();
- wrapperFactory = new FirstRowMergeFunctionWrapperFactory();
- } else {
- if (lookupStrategy.deletionVector) {
- if (lookupStrategy.produceChangelog
- || mergeEngine != DEDUPLICATE
- || !options.sequenceField().isEmpty()) {
- processorFactory =
PersistValueAndPosProcessor.factory(valueType);
- } else {
- processorFactory = PersistPositionProcessor.factory();
- }
- } else {
- processorFactory =
PersistValueProcessor.factory(valueType);
- }
- wrapperFactory =
- new LookupMergeFunctionWrapperFactory<>(
- logDedupEqualSupplier.get(),
- lookupStrategy,
- UserDefinedSeqComparator.create(valueType,
options));
- }
- LookupLevels<?> lookupLevels =
- createLookupLevels(
- partition, bucket, levels, processorFactory,
lookupReaderFactory);
- RemoteLookupFileManager<?> remoteLookupFileManager = null;
- if (options.lookupRemoteFileEnabled()) {
- remoteLookupFileManager =
- new RemoteLookupFileManager<>(
- fileIO,
- keyReaderFactory.pathFactory(),
- lookupLevels,
- options.lookupRemoteLevelThreshold());
- }
- return new LookupMergeTreeCompactRewriter(
- maxLevel,
- mergeEngine,
- lookupLevels,
- readerFactory,
- writerFactory,
- keyComparator,
- userDefinedSeqComparator,
- mfFactory,
- mergeSorter,
- wrapperFactory,
- lookupStrategy.produceChangelog,
- dvMaintainer,
- options,
- remoteLookupFileManager);
- } else {
- return new MergeTreeCompactRewriter(
- readerFactory,
- writerFactory,
- keyComparator,
- userDefinedSeqComparator,
- mfFactory,
- mergeSorter);
- }
- }
-
- private <T> LookupLevels<T> createLookupLevels(
- BinaryRow partition,
- int bucket,
- Levels levels,
- PersistProcessor.Factory<T> processorFactory,
- FileReaderFactory<KeyValue> readerFactory) {
- if (ioManager == null) {
- throw new RuntimeException(
- "Can not use lookup, there is no temp disk directory to
use.");
- }
- LookupStoreFactory lookupStoreFactory =
- LookupStoreFactory.create(
- options,
- cacheManager,
- new
RowCompactedSerializer(keyType).createSliceComparator());
- Options options = this.options.toConfiguration();
- if (lookupFileCache == null) {
- lookupFileCache =
- LookupFile.createCache(
-
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
-
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
- }
- return new LookupLevels<>(
- schemaId -> schemaManager.schema(schemaId).logicalRowType(),
- schema.id(),
- levels,
- keyComparatorSupplier.get(),
- keyType,
- processorFactory,
- LookupSerializerFactory.INSTANCE.get(),
- readerFactory::createRecordReader,
- file ->
- ioManager
- .createChannel(
- localFilePrefix(partitionType,
partition, bucket, file))
- .getPathFile(),
- lookupStoreFactory,
- bfGenerator(options),
- lookupFileCache);
- }
-
@Override
protected Function<WriterContainer<KeyValue>, Boolean>
createWriterCleanChecker() {
return createConflictAwareWriterCleanChecker(commitUser, restore);
@@ -466,8 +219,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
@Override
public void close() throws Exception {
super.close();
- if (lookupFileCache != null) {
- lookupFileCache.invalidateAll();
- }
+ compactManagerFactory.close();
}
}