This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c9d6f47c4 [core] Filter on Manifest Cache (#929)
c9d6f47c4 is described below
commit c9d6f47c4beaf30c679529fc3cf96ced4683b303
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 18 19:41:54 2023 +0800
[core] Filter on Manifest Cache (#929)
---
docs/content/maintenance/write-performance.md | 2 +-
.../shortcodes/generated/core_configuration.html | 12 +--
.../org/apache/paimon/reader/RecordReader.java | 47 +++++++++
.../main/java/org/apache/paimon/utils/Filter.java | 7 ++
.../java/org/apache/paimon/AbstractFileStore.java | 25 +++--
.../org/apache/paimon/AppendOnlyFileStore.java | 17 +++-
.../main/java/org/apache/paimon/CoreOptions.java | 17 ++--
.../src/main/java/org/apache/paimon/FileStore.java | 3 +
.../java/org/apache/paimon/KeyValueFileStore.java | 16 ++-
.../paimon/manifest/ManifestCacheFilter.java | 23 +++--
.../paimon/manifest/ManifestEntrySerializer.java | 15 +++
.../paimon/operation/AbstractFileStoreScan.java | 97 +++++++++++++-----
.../paimon/operation/AbstractFileStoreWrite.java | 4 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 1 -
.../org/apache/paimon/operation/FileStoreScan.java | 5 +
.../paimon/operation/KeyValueFileStoreScan.java | 1 -
.../paimon/table/AppendOnlyFileStoreTable.java | 9 +-
.../table/ChangelogValueCountFileStoreTable.java | 9 +-
.../table/ChangelogWithKeyFileStoreTable.java | 9 +-
.../org/apache/paimon/table/FileStoreTable.java | 3 +
.../apache/paimon/table/system/BucketsTable.java | 37 +++----
.../java/org/apache/paimon/utils/ObjectsCache.java | 26 +++--
.../java/org/apache/paimon/utils/ObjectsFile.java | 19 +++-
.../paimon/table/FileStoreTableTestBase.java | 2 +-
.../ContinuousCompactorFollowUpScannerTest.java | 3 +-
.../org/apache/paimon/utils/ObjectsCacheTest.java | 108 +++++++++++++++++++++
.../flink/sink/BucketsRowChannelComputer.java | 49 ++++++++++
.../apache/paimon/flink/sink/ChannelComputer.java | 11 +++
.../paimon/flink/sink/CompactorSinkBuilder.java | 8 +-
.../sink/OffsetRowDataHashStreamPartitioner.java | 83 ----------------
.../paimon/flink/sink/RowDataChannelComputer.java | 14 +--
.../paimon/flink/sink/StoreCompactOperator.java | 26 ++---
.../apache/paimon/flink/sink/StoreSinkWrite.java | 3 +-
.../paimon/flink/sink/StoreSinkWriteImpl.java | 22 +++--
.../paimon/flink/sink/StoreSinkWriteState.java | 7 ++
.../flink/sink/cdc/CdcRecordChannelComputer.java | 3 +-
.../sink/cdc/CdcRecordStoreWriteOperator.java | 2 +-
.../paimon/flink/source/CompactorSourceITCase.java | 9 +-
38 files changed, 512 insertions(+), 242 deletions(-)
diff --git a/docs/content/maintenance/write-performance.md
b/docs/content/maintenance/write-performance.md
index 1f01d75b6..c2dde5d20 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -56,7 +56,7 @@ It is recommended that the parallelism of sink should be less
than or equal to t
## Write Initialize
In the initialization of write, the writer of the bucket needs to read all
historical files. If there is a bottleneck
-here (For example, writing a large number of partitions simultaneously), you
can use `manifest.cache-size` to cache
+here (For example, writing a large number of partitions simultaneously), you
can use `write-manifest-cache` to cache
the read manifest data to accelerate initialization.
## Compaction
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 62a9f6537..b6b91d807 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -158,12 +158,6 @@
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
- <tr>
- <td><h5>manifest.cache-size</h5></td>
- <td style="word-wrap: break-word;">0 bytes</td>
- <td>MemorySize</td>
- <td>Cache size for reading manifest files.</td>
- </tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">avro</td>
@@ -374,6 +368,12 @@
<td>Boolean</td>
<td>Whether the write buffer can be spillable. Enabled by default
when using object storage.</td>
</tr>
+ <tr>
+ <td><h5>write-manifest-cache</h5></td>
+ <td style="word-wrap: break-word;">0 bytes</td>
+ <td>MemorySize</td>
+ <td>Cache size for reading manifest files for write
initialization.</td>
+ </tr>
<tr>
<td><h5>write-mode</h5></td>
<td style="word-wrap: break-word;">change-log</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index b02075b91..474e33b4d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.reader;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.Filter;
import javax.annotation.Nullable;
@@ -89,6 +90,31 @@ public interface RecordReader<T> extends Closeable {
}
};
}
+
+ /** Filters a {@link RecordIterator}. */
+ default RecordIterator<T> filter(Filter<T> filter) {
+ RecordIterator<T> thisIterator = this;
+ return new RecordIterator<T>() {
+ @Nullable
+ @Override
+ public T next() throws IOException {
+ while (true) {
+ T next = thisIterator.next();
+ if (next == null) {
+ return null;
+ }
+ if (filter.test(next)) {
+ return next;
+ }
+ }
+ }
+
+ @Override
+ public void releaseBatch() {
+ thisIterator.releaseBatch();
+ }
+ };
+ }
}
//
-------------------------------------------------------------------------
@@ -136,6 +162,27 @@ public interface RecordReader<T> extends Closeable {
};
}
+ /** Filters a {@link RecordReader}. */
+ default RecordReader<T> filter(Filter<T> filter) {
+ RecordReader<T> thisReader = this;
+ return new RecordReader<T>() {
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() throws IOException {
+ RecordIterator<T> iterator = thisReader.readBatch();
+ if (iterator == null) {
+ return null;
+ }
+ return iterator.filter(filter);
+ }
+
+ @Override
+ public void close() throws IOException {
+ thisReader.close();
+ }
+ };
+ }
+
/** Convert this reader to a {@link CloseableIterator}. */
default CloseableIterator<T> toCloseableIterator() {
return new RecordReaderIterator<>(this);
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
index 39ff64ea6..2764bc773 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
@@ -27,6 +27,8 @@ import org.apache.paimon.predicate.Predicate;
@FunctionalInterface
public interface Filter<T> {
+ Filter<?> ALWAYS_TRUE = t -> true;
+
/**
* Evaluates this predicate on the given argument.
*
@@ -34,4 +36,9 @@ public interface Filter<T> {
* @return {@code true} if the input argument matches the predicate,
otherwise {@code false}
*/
boolean test(T t);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static <T> Filter<T> alwaysTrue() {
+ return (Filter) ALWAYS_TRUE;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5f8676012..de939bdd9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -51,7 +51,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
protected final CoreOptions options;
protected final RowType partitionType;
- @Nullable private final SegmentsCache<String> manifestCache;
+ @Nullable private final SegmentsCache<String> writeManifestCache;
public AbstractFileStore(
FileIO fileIO,
@@ -64,11 +64,11 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
this.schemaId = schemaId;
this.options = options;
this.partitionType = partitionType;
- MemorySize manifestCacheSize = options.manifestCacheSize();
- this.manifestCache =
- manifestCacheSize.getBytes() == 0
+ MemorySize writeManifestCache = options.writeManifestCache();
+ this.writeManifestCache =
+ writeManifestCache.getBytes() == 0
? null
- : new SegmentsCache<>(options.pageSize(),
manifestCacheSize);
+ : new SegmentsCache<>(options.pageSize(),
writeManifestCache);
}
public FileStorePathFactory pathFactory() {
@@ -86,6 +86,10 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
@VisibleForTesting
public ManifestFile.Factory manifestFileFactory() {
+ return manifestFileFactory(false);
+ }
+
+ protected ManifestFile.Factory manifestFileFactory(boolean forWrite) {
return new ManifestFile.Factory(
fileIO,
schemaManager,
@@ -93,13 +97,20 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.manifestFormat(),
pathFactory(),
options.manifestTargetSize().getBytes(),
- manifestCache);
+ forWrite ? writeManifestCache : null);
}
@VisibleForTesting
public ManifestList.Factory manifestListFactory() {
+ return manifestListFactory(false);
+ }
+
+ protected ManifestList.Factory manifestListFactory(boolean forWrite) {
return new ManifestList.Factory(
- fileIO, options.manifestFormat(), pathFactory(),
manifestCache);
+ fileIO,
+ options.manifestFormat(),
+ pathFactory(),
+ forWrite ? writeManifestCache : null);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 893039c23..8e28b5b2d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -66,6 +67,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public AppendOnlyFileStoreWrite newWrite(String commitUser) {
+ return newWrite(commitUser, null);
+ }
+
+ @Override
+ public AppendOnlyFileStoreWrite newWrite(
+ String commitUser, ManifestCacheFilter manifestFilter) {
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
@@ -74,11 +81,11 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
rowType,
pathFactory(),
snapshotManager(),
- newScan(true),
+ newScan(true).withManifestCacheFilter(manifestFilter),
options);
}
- private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
+ private AppendOnlyFileStoreScan newScan(boolean forWrite) {
return new AppendOnlyFileStoreScan(
partitionType,
bucketKeyType.getFieldCount() == 0 ? rowType : bucketKeyType,
@@ -86,10 +93,10 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
snapshotManager(),
schemaManager,
schemaId,
- manifestFileFactory(),
- manifestListFactory(),
+ manifestFileFactory(forWrite),
+ manifestListFactory(forWrite),
options.bucket(),
- checkNumOfBuckets);
+ forWrite);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index c27d8b26a..ce8ccca63 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -133,12 +133,6 @@ public class CoreOptions implements Serializable {
"To avoid frequent manifest merges, this parameter
specifies the minimum number "
+ "of ManifestFileMeta to merge.");
- public static final ConfigOption<MemorySize> MANIFEST_CACHE_SIZE =
- key("manifest.cache-size")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(0))
- .withDescription("Cache size for reading manifest files.");
-
public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
key("partition.default-name")
.stringType()
@@ -228,6 +222,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether the write buffer can be spillable.
Enabled by default when using object storage.");
+ public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
+ key("write-manifest-cache")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(0))
+ .withDescription(
+ "Cache size for reading manifest files for write
initialization.");
+
public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES =
key("local-sort.max-num-file-handles")
.intType()
@@ -650,8 +651,8 @@ public class CoreOptions implements Serializable {
return options.get(MANIFEST_TARGET_FILE_SIZE);
}
- public MemorySize manifestCacheSize() {
- return options.get(MANIFEST_CACHE_SIZE);
+ public MemorySize writeManifestCache() {
+ return options.get(WRITE_MANIFEST_CACHE);
}
public String partitionDefaultName() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 57fb4d23c..f0a60d905 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -18,6 +18,7 @@
package org.apache.paimon;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.FileStoreRead;
@@ -50,6 +51,8 @@ public interface FileStore<T> extends Serializable {
FileStoreWrite<T> newWrite(String commitUser);
+ FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter
manifestFilter);
+
FileStoreCommit newCommit(String commitUser);
FileStoreExpire newExpire();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 1201c8e96..77fbd5d76 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.operation.KeyValueFileStoreScan;
@@ -87,6 +88,11 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public KeyValueFileStoreWrite newWrite(String commitUser) {
+ return newWrite(commitUser, null);
+ }
+
+ @Override
+ public KeyValueFileStoreWrite newWrite(String commitUser,
ManifestCacheFilter manifestFilter) {
return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
@@ -98,12 +104,12 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
mfFactory,
pathFactory(),
snapshotManager(),
- newScan(true),
+ newScan(true).withManifestCacheFilter(manifestFilter),
options,
keyValueFieldsExtractor);
}
- private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
+ private KeyValueFileStoreScan newScan(boolean forWrite) {
return new KeyValueFileStoreScan(
partitionType,
bucketKeyType,
@@ -112,10 +118,10 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
schemaManager,
schemaId,
keyValueFieldsExtractor,
- manifestFileFactory(),
- manifestListFactory(),
+ manifestFileFactory(forWrite),
+ manifestListFactory(forWrite),
options.bucket(),
- checkNumOfBuckets);
+ forWrite);
}
@Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
similarity index 60%
copy from paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
copy to
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
index 39ff64ea6..32ef757b5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
@@ -16,22 +16,21 @@
* limitations under the License.
*/
-package org.apache.paimon.utils;
+package org.apache.paimon.manifest;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.operation.AbstractFileStoreScan;
+
+import javax.annotation.concurrent.ThreadSafe;
/**
- * Represents a filter (boolean-valued function) of one argument. This class
is for avoiding name
- * conflicting to {@link Predicate}.
+ * Filter for manifest cache, this is used in {@link AbstractFileStoreScan}
for improving cache
+ * utilization. NOTE: Please use this interface with caution and make sure
that only filtered data
+ * is required, otherwise it will cause correctness issues.
*/
+@ThreadSafe
@FunctionalInterface
-public interface Filter<T> {
+public interface ManifestCacheFilter {
- /**
- * Evaluates this predicate on the given argument.
- *
- * @param t the input argument
- * @return {@code true} if the input argument matches the predicate,
otherwise {@code false}
- */
- boolean test(T t);
+ boolean test(BinaryRow partition, int bucket);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index ad398e2ac..a733ba9e1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -18,11 +18,14 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.utils.VersionedObjectSerializer;
+import java.util.function.Function;
+
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
@@ -72,4 +75,16 @@ public class ManifestEntrySerializer extends
VersionedObjectSerializer<ManifestE
row.getInt(3),
dataFileMetaSerializer.fromRow(row.getRow(4,
dataFileMetaSerializer.numFields())));
}
+
+ public static Function<InternalRow, BinaryRow> partitionGetter() {
+ return row -> deserializeBinaryRow(row.getBinary(2));
+ }
+
+ public static Function<InternalRow, Integer> bucketGetter() {
+ return row -> row.getInt(3);
+ }
+
+ public static Function<InternalRow, Integer> totalBucketGetter() {
+ return row -> row.getInt(4);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 044355c07..0650d9a55 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -20,7 +20,10 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
@@ -34,7 +37,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
@@ -49,6 +51,8 @@ import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {
@@ -63,7 +67,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
- private final long schemaId;
private Predicate partitionFilter;
private BucketSelector bucketSelector;
@@ -74,24 +77,23 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private ScanKind scanKind = ScanKind.ALL;
private Filter<Integer> levelFilter = null;
+ private ManifestCacheFilter manifestCacheFilter = null;
+
public AbstractFileStoreScan(
RowType partitionType,
RowType bucketKeyType,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
- long schemaId,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets) {
this.partitionStatsConverter = new
FieldStatsArraySerializer(partitionType);
this.partitionConverter = new
RowDataToObjectArrayConverter(partitionType);
- Preconditions.checkArgument(
- bucketKeyType.getFieldCount() > 0, "The bucket keys should not
be empty.");
+ checkArgument(bucketKeyType.getFieldCount() > 0, "The bucket keys
should not be empty.");
this.bucketKeyType = bucketKeyType;
this.snapshotManager = snapshotManager;
this.schemaManager = schemaManager;
- this.schemaId = schemaId;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
@@ -141,6 +143,20 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
+ if (manifestCacheFilter != null) {
+ checkArgument(
+ manifestCacheFilter.test(partition, bucket),
+ String.format(
+ "This is a bug! The partition %s and bucket %s is
filtered!",
+ partition, bucket));
+ }
+ withPartitionFilter(Collections.singletonList(partition));
+ withBucket(bucket);
+ return this;
+ }
+
@Override
public FileStoreScan withSnapshot(long snapshotId) {
this.specifiedSnapshotId = snapshotId;
@@ -171,6 +187,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withManifestCacheFilter(ManifestCacheFilter
manifestFilter) {
+ this.manifestCacheFilter = manifestFilter;
+ return this;
+ }
+
@Override
public Plan plan() {
List<ManifestFileMeta> manifests = specifiedManifests;
@@ -200,7 +222,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
.parallelStream()
.filter(this::filterManifestFileMeta)
.flatMap(m ->
readManifestFileMeta(m).stream())
-
.filter(this::filterManifestEntry)
+
.filter(this::filterByStats)
.collect(Collectors.toList()))
.get();
} catch (InterruptedException | ExecutionException e) {
@@ -280,10 +302,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
// called by multiple threads
// ------------------------------------------------------------------------
- protected long tableSchemaId() {
- return schemaId;
- }
-
/** Note: Keep this thread-safe. */
protected TableSchema scanTableSchema(long id) {
return tableSchemas.computeIfAbsent(id, key ->
schemaManager.schema(id));
@@ -297,17 +315,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
manifest.partitionStats().fields(partitionStatsConverter));
}
- /** Note: Keep this thread-safe. */
- private boolean filterManifestEntry(ManifestEntry entry) {
- return filterByPartition(entry) && filterByStats(entry);
- }
-
- /** Note: Keep this thread-safe. */
- private boolean filterByPartition(ManifestEntry entry) {
- return (partitionFilter == null
- ||
partitionFilter.test(partitionConverter.convert(entry.partition())));
- }
-
/** Note: Keep this thread-safe. */
private boolean filterByBucket(ManifestEntry entry) {
return (specifiedBucket == null || entry.bucket() == specifiedBucket);
@@ -329,7 +336,51 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta
manifest) {
- return manifestFileFactory.create().read(manifest.fileName());
+ return manifestFileFactory
+ .create()
+ .read(manifest.fileName(), manifestCacheRowFilter(),
manifestEntryRowFilter());
+ }
+
+ /** Note: Keep this thread-safe. */
+ private Filter<InternalRow> manifestEntryRowFilter() {
+ Function<InternalRow, BinaryRow> partitionGetter =
+ ManifestEntrySerializer.partitionGetter();
+ Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
+ Function<InternalRow, Integer> totalBucketGetter =
+ ManifestEntrySerializer.totalBucketGetter();
+ return row -> {
+ if ((partitionFilter != null
+ && !partitionFilter.test(
+
partitionConverter.convert(partitionGetter.apply(row))))) {
+ return false;
+ }
+
+ if (specifiedBucket != null && numOfBuckets ==
totalBucketGetter.apply(row)) {
+ return specifiedBucket.intValue() == bucketGetter.apply(row);
+ }
+
+ return true;
+ };
+ }
+
+ /** Note: Keep this thread-safe. */
+ private Filter<InternalRow> manifestCacheRowFilter() {
+ if (manifestCacheFilter == null) {
+ return Filter.alwaysTrue();
+ }
+
+ Function<InternalRow, BinaryRow> partitionGetter =
+ ManifestEntrySerializer.partitionGetter();
+ Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
+ Function<InternalRow, Integer> totalBucketGetter =
+ ManifestEntrySerializer.totalBucketGetter();
+ return row -> {
+ if (numOfBuckets != totalBucketGetter.apply(row)) {
+ return true;
+ }
+
+ return manifestCacheFilter.test(partitionGetter.apply(row),
bucketGetter.apply(row));
+ };
}
// ------------------------------------------------------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 148083129..a761aefbf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -318,8 +318,8 @@ public abstract class AbstractFileStoreWrite<T>
List<DataFileMeta> existingFileMetas = new ArrayList<>();
if (snapshotId != null) {
// Concat all the DataFileMeta of existing files into
existingFileMetas.
-
scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
- .withBucket(bucket).plan().files().stream()
+ scan.withSnapshot(snapshotId).withPartitionBucket(partition,
bucket).plan().files()
+ .stream()
.map(ManifestEntry::file)
.forEach(existingFileMetas::add);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index f2382fb5b..fdab17423 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -58,7 +58,6 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
bucketKeyType,
snapshotManager,
schemaManager,
- schemaId,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 93f5e6196..9d2a37d62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
@@ -43,6 +44,8 @@ public interface FileStoreScan {
FileStoreScan withBucket(int bucket);
+ FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
+
FileStoreScan withSnapshot(long snapshotId);
FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
@@ -51,6 +54,8 @@ public interface FileStoreScan {
FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
+ FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
+
/** Produce a {@link Plan}. */
Plan plan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 7011c74c0..a35177b00 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -60,7 +60,6 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
bucketKeyType,
snapshotManager,
schemaManager,
- schemaId,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 8dcf5c757..c9f3b6006 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.WriteMode;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.FileStoreScan;
@@ -123,8 +124,14 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<InternalRow> newWrite(String commitUser) {
+ return newWrite(commitUser, null);
+ }
+
+ @Override
+ public TableWriteImpl<InternalRow> newWrite(
+ String commitUser, ManifestCacheFilter manifestFilter) {
return new TableWriteImpl<>(
- store().newWrite(commitUser),
+ store().newWrite(commitUser, manifestFilter),
new InternalRowKeyAndBucketExtractor(tableSchema),
record -> {
Preconditions.checkState(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index cab4f57c5..b91397348 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.ValueCountMergeFunction;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreScan;
@@ -141,9 +142,15 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser) {
+ return newWrite(commitUser, null);
+ }
+
+ @Override
+ public TableWriteImpl<KeyValue> newWrite(
+ String commitUser, ManifestCacheFilter manifestFilter) {
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
- store().newWrite(commitUser),
+ store().newWrite(commitUser, manifestFilter),
new InternalRowKeyAndBucketExtractor(tableSchema),
record -> {
switch (record.row().getRowKind()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 934eaa9e2..74edb4a01 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.WriteMode;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
@@ -214,6 +215,12 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser) {
+ return newWrite(commitUser, null);
+ }
+
+ @Override
+ public TableWriteImpl<KeyValue> newWrite(
+ String commitUser, ManifestCacheFilter manifestFilter) {
final SequenceGenerator sequenceGenerator =
store().options()
.sequenceField()
@@ -221,7 +228,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
.orElse(null);
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
- store().newWrite(commitUser),
+ store().newWrite(commitUser, manifestFilter),
new InternalRowKeyAndBucketExtractor(tableSchema),
record -> {
long sequenceNumber =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 71902147b..664d400b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.FileStore;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.table.sink.TableCommitImpl;
@@ -80,6 +81,8 @@ public interface FileStoreTable extends DataTable {
@Override
TableWriteImpl<?> newWrite(String commitUser);
+ TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter
manifestFilter);
+
@Override
TableCommitImpl newCommit(String commitUser);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index d1ed08ab4..9e4ead3cb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
@@ -41,7 +40,6 @@ import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.SnapshotManager;
@@ -51,6 +49,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
/**
* A table to produce modified partitions and buckets for each snapshot.
*
@@ -85,21 +86,11 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
@Override
public RowType rowType() {
- RowType partitionType = wrapped.schema().logicalPartitionType();
-
List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "_SNAPSHOT_ID", new BigIntType()));
- fields.addAll(partitionType.getFields());
- // same with ManifestEntry.schema
- fields.add(new DataField(1, "_BUCKET", new IntType()));
- fields.add(new DataField(2, "_FILES", new VarBinaryType()));
- return new RowType(fields);
- }
-
- public static RowType partitionWithBucketRowType(RowType partitionType) {
- List<DataField> fields = new ArrayList<>(partitionType.getFields());
- // same with ManifestEntry.schema
- fields.add(new DataField(3, "_BUCKET", new IntType()));
+ fields.add(new DataField(0, "_SNAPSHOT_ID", new BigIntType(false)));
+ fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
+ fields.add(new DataField(2, "_BUCKET", new IntType(false)));
+ fields.add(new DataField(3, "_FILES", newBytesType(false)));
return new RowType(fields);
}
@@ -171,10 +162,6 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
DataSplit dataSplit = (DataSplit) split;
- InternalRow row =
- new JoinedRow(GenericRow.of(dataSplit.snapshotId()),
dataSplit.partition());
- row = new JoinedRow(row, GenericRow.of(dataSplit.bucket()));
-
List<DataFileMeta> files = Collections.emptyList();
if (isContinuous) {
// Serialized files are only useful in streaming jobs.
@@ -182,10 +169,12 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
// be compacted and don't need to concern incremental new
files.
files = dataSplit.files();
}
- row =
- new JoinedRow(
- row,
- GenericRow.of((Object)
dataFileMetaSerializer.serializeList(files)));
+ InternalRow row =
+ GenericRow.of(
+ dataSplit.snapshotId(),
+ serializeBinaryRow(dataSplit.partition()),
+ dataSplit.bucket(),
+ dataFileMetaSerializer.serializeList(files));
return new
IteratorRecordReader<>(Collections.singletonList(row).iterator());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 06ed61320..14254ae95 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -18,14 +18,14 @@
package org.apache.paimon.utils;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.RandomAccessInputView;
import org.apache.paimon.data.Segments;
import org.apache.paimon.data.SimpleCollectingOutputView;
-import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentSource;
-import org.apache.paimon.types.RowType;
import java.io.EOFException;
import java.io.IOException;
@@ -38,7 +38,7 @@ public class ObjectsCache<K, V> {
private final SegmentsCache<K> cache;
private final ObjectSerializer<V> serializer;
- private final RowCompactedSerializer compactedSerializer;
+ private final InternalRowSerializer rowSerializer;
private final Function<K, CloseableIterator<InternalRow>> reader;
public ObjectsCache(
@@ -47,26 +47,31 @@ public class ObjectsCache<K, V> {
Function<K, CloseableIterator<InternalRow>> reader) {
this.cache = cache;
this.serializer = serializer;
- this.compactedSerializer = new
RowCompactedSerializer(RowType.of(serializer.fieldTypes()));
+ this.rowSerializer = new
InternalRowSerializer(serializer.fieldTypes());
this.reader = reader;
}
- public List<V> read(K key) throws IOException {
- Segments segments = cache.getSegments(key, this::readSegments);
+ public List<V> read(K key, Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
+ throws IOException {
+ Segments segments = cache.getSegments(key, k -> readSegments(k,
loadFilter));
List<V> entries = new ArrayList<>();
RandomAccessInputView view =
new RandomAccessInputView(
segments.segments(), cache.pageSize(),
segments.limitInLastSegment());
+ BinaryRow binaryRow = new BinaryRow(rowSerializer.getArity());
while (true) {
try {
-
entries.add(serializer.fromRow(compactedSerializer.deserialize(view)));
+ rowSerializer.mapFromPages(binaryRow, view);
+ if (readFilter.test(binaryRow)) {
+ entries.add(serializer.fromRow(binaryRow));
+ }
} catch (EOFException e) {
return entries;
}
}
}
- private Segments readSegments(K key) {
+ private Segments readSegments(K key, Filter<InternalRow> loadFilter) {
try (CloseableIterator<InternalRow> iterator = reader.apply(key)) {
ArrayList<MemorySegment> segments = new ArrayList<>();
MemorySegmentSource segmentSource =
@@ -74,7 +79,10 @@ public class ObjectsCache<K, V> {
SimpleCollectingOutputView output =
new SimpleCollectingOutputView(segments, segmentSource,
cache.pageSize());
while (iterator.hasNext()) {
- compactedSerializer.serialize(iterator.next(), output);
+ InternalRow row = iterator.next();
+ if (loadFilter.test(row)) {
+ rowSerializer.serializeToPages(row, output);
+ }
}
return new Segments(segments,
output.getCurrentPositionInSegment());
} catch (Exception e) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 10ac5ee2c..8e9e6bf32 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -21,10 +21,12 @@ package org.apache.paimon.utils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.reader.RecordReader;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import static org.apache.paimon.utils.FileUtils.createFormatReader;
@@ -54,13 +56,24 @@ public abstract class ObjectsFile<T> {
}
public List<T> read(String fileName) {
+ return read(fileName, Filter.alwaysTrue(), Filter.alwaysTrue());
+ }
+
+ public List<T> read(
+ String fileName, Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter) {
try {
if (cache != null) {
- return cache.read(fileName);
+ return cache.read(fileName, loadFilter, readFilter);
}
- return FileUtils.readListFromFile(
- fileIO, pathFactory.toPath(fileName), serializer,
readerFactory);
+ RecordReader<InternalRow> reader =
+ createFormatReader(fileIO, readerFactory,
pathFactory.toPath(fileName));
+ if (readFilter != Filter.ALWAYS_TRUE) {
+ reader = reader.filter(readFilter);
+ }
+ List<T> result = new ArrayList<>();
+ reader.forEachRemaining(row ->
result.add(serializer.fromRow(row)));
+ return result;
} catch (IOException e) {
throw new RuntimeException("Failed to read manifest list " +
fileName, e);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index edf58e0d5..bc27cef46 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -427,7 +427,7 @@ public abstract class FileStoreTableTestBase {
createFileStoreTable(
conf ->
conf.set(
- CoreOptions.MANIFEST_CACHE_SIZE,
+ CoreOptions.WRITE_MANIFEST_CACHE,
MemorySize.ofMebiBytes(1)));
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 72c0e62f8..28c769645 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ContinuousCompactorFollowUpScanner}. */
@@ -117,7 +118,7 @@ public class ContinuousCompactorFollowUpScannerTest extends
ScannerTestBase {
"%s %d|%d|%d|%d",
rowData.getRowKind().shortString(),
rowData.getLong(0),
- rowData.getInt(1),
+ deserializeBinaryRow(rowData.getBinary(1)).getInt(0),
rowData.getInt(2),
numFiles);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
new file mode 100644
index 000000000..672ca209e
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ObjectsCache}. */
+public class ObjectsCacheTest {
+
+ @Test
+ public void test() throws IOException {
+ Map<String, List<String>> map = new HashMap<>();
+ ObjectsCache<String, String> cache =
+ new ObjectsCache<>(
+ new SegmentsCache<>(1024, MemorySize.ofKibiBytes(5)),
+ new StringSerializer(),
+ k ->
+ CloseableIterator.adapterForIterator(
+ map.get(k).stream()
+ .map(BinaryString::fromString)
+ .map(GenericRow::of)
+ .map(r -> (InternalRow) r)
+ .iterator()));
+
+ // test empty
+ map.put("k1", Collections.emptyList());
+ List<String> values = cache.read("k1", Filter.alwaysTrue(),
Filter.alwaysTrue());
+ assertThat(values).isEmpty();
+
+ // test values
+ List<String> expect = Arrays.asList("v1", "v2", "v3");
+ map.put("k2", expect);
+ values = cache.read("k2", Filter.alwaysTrue(), Filter.alwaysTrue());
+ assertThat(values).containsExactlyElementsOf(expect);
+
+ // test cache
+ values = cache.read("k2", Filter.alwaysTrue(), Filter.alwaysTrue());
+ assertThat(values).containsExactlyElementsOf(expect);
+
+ // test filter
+ values =
+ cache.read("k2", Filter.alwaysTrue(), r ->
r.getString(0).toString().endsWith("2"));
+ assertThat(values).containsExactly("v2");
+
+ // test load filter
+ expect = Arrays.asList("v1", "v2", "v3");
+ map.put("k3", expect);
+ values =
+ cache.read("k3", r -> r.getString(0).toString().endsWith("2"),
Filter.alwaysTrue());
+ assertThat(values).containsExactly("v2");
+
+ // test load filter empty
+ expect = Arrays.asList("v1", "v2", "v3");
+ map.put("k4", expect);
+ values =
+ cache.read("k4", r -> r.getString(0).toString().endsWith("5"),
Filter.alwaysTrue());
+ assertThat(values).isEmpty();
+ }
+
+ private static class StringSerializer extends ObjectSerializer<String> {
+
+ public StringSerializer() {
+ super(RowType.of(DataTypes.STRING()));
+ }
+
+ @Override
+ public InternalRow toRow(String record) {
+ return GenericRow.of(BinaryString.fromString(record));
+ }
+
+ @Override
+ public String fromRow(InternalRow rowData) {
+ return rowData.getString(0).toString();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketsRowChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketsRowChannelComputer.java
new file mode 100644
index 000000000..665e0dc84
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketsRowChannelComputer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.system.BucketsTable;
+
+import org.apache.flink.table.data.RowData;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
+/** {@link ChannelComputer} to partition {@link RowData} from {@link
BucketsTable}. */
+public class BucketsRowChannelComputer implements ChannelComputer<RowData> {
+
+ private transient int numberOfChannels;
+
+ @Override
+ public void setup(int numberOfChannels) {
+ this.numberOfChannels = numberOfChannels;
+ }
+
+ @Override
+ public int channel(RowData rowData) {
+ BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1));
+ int bucket = rowData.getInt(2);
+ return ChannelComputer.select(partition, bucket, numberOfChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "compactor-partitioner";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
index fdef4c141..648143d92 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.BinaryRow;
+
import java.io.Serializable;
/**
@@ -30,4 +32,13 @@ public interface ChannelComputer<T> extends Serializable {
void setup(int numChannels);
int channel(T record);
+
+ static int select(BinaryRow partition, int bucket, int numChannels) {
+ int startChannel = Math.abs(partition.hashCode()) % numChannels;
+ return (startChannel + bucket) % numChannels;
+ }
+
+ static int select(int bucket, int numChannels) {
+ return bucket % numChannels;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
index d293e4ec7..1abc68256 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.system.BucketsTable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -51,11 +50,8 @@ public class CompactorSinkBuilder {
}
public DataStreamSink<?> build() {
- OffsetRowDataHashStreamPartitioner partitioner =
- new OffsetRowDataHashStreamPartitioner(
- BucketsTable.partitionWithBucketRowType(
- table.schema().logicalPartitionType()),
- 1);
+ BucketingStreamPartitioner<RowData> partitioner =
+ new BucketingStreamPartitioner<>(new
BucketsRowChannelComputer());
PartitionTransformation<RowData> partitioned =
new PartitionTransformation<>(input.getTransformation(),
partitioner);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/OffsetRowDataHashStreamPartitioner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/OffsetRowDataHashStreamPartitioner.java
deleted file mode 100644
index fe0216358..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/OffsetRowDataHashStreamPartitioner.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.OffsetRow;
-
-import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
-
-/**
- * {@link StreamPartitioner} to partition {@link RowData} according to its
hash value from an {@link
- * OffsetRow}.
- */
-public class OffsetRowDataHashStreamPartitioner extends
StreamPartitioner<RowData> {
-
- private final RowType offsetRowType;
- private final int offset;
-
- private transient OffsetRow offsetRow;
- private transient InternalRowSerializer serializer;
-
- public OffsetRowDataHashStreamPartitioner(RowType offsetRowType, int
offset) {
- this.offsetRowType = offsetRowType;
- this.offset = offset;
- }
-
- @Override
- public void setup(int numberOfChannels) {
- super.setup(numberOfChannels);
- this.offsetRow = new OffsetRow(offsetRowType.getFieldCount(), offset);
- serializer = new InternalRowSerializer(offsetRowType);
- }
-
- @Override
- public int selectChannel(SerializationDelegate<StreamRecord<RowData>>
record) {
- RowData rowData = record.getInstance().getValue();
- int hash =
- serializer.toBinaryRow(offsetRow.replace(new
FlinkRowWrapper(rowData))).hashCode();
- return Math.abs(hash) % numberOfChannels;
- }
-
- @Override
- public StreamPartitioner<RowData> copy() {
- return this;
- }
-
- @Override
- public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
- return SubtaskStateMapper.FULL;
- }
-
- @Override
- public boolean isPointwise() {
- return false;
- }
-
- @Override
- public String toString() {
- return "compactor-stream-partitioner";
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
index 85e7ebce7..19e1f9d5d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
@@ -53,15 +53,11 @@ public class RowDataChannelComputer implements
ChannelComputer<RowData> {
}
public int channel(BinaryRow partition, int bucket) {
- int startChannel;
- if (hasLogSink) {
- // log sinks like Kafka only consider bucket and don't care about
partition
- // so same bucket, even from different partition, must go to the
same channel
- startChannel = 0;
- } else {
- startChannel = Math.abs(partition.hashCode()) % numChannels;
- }
- return (startChannel + bucket) % numChannels;
+ // log sinks like Kafka only consider bucket and don't care about
partition
+ // so same bucket, even from different partition, must go to the same
channel
+ return hasLogSink
+ ? ChannelComputer.select(bucket, numChannels)
+ : ChannelComputer.select(partition, bucket, numChannels);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 156b3a798..f1b77cf35 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -20,12 +20,9 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.OffsetRow;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -36,6 +33,8 @@ import org.apache.flink.table.data.RowData;
import java.io.IOException;
import java.util.List;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
/**
* A dedicated operator for manual triggered compaction.
*
@@ -52,8 +51,6 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData> {
private transient StoreSinkWriteState state;
private transient StoreSinkWrite write;
- private transient InternalRowSerializer partitionSerializer;
- private transient OffsetRow reusedPartition;
private transient DataFileMetaSerializer dataFileMetaSerializer;
public StoreCompactOperator(
@@ -81,13 +78,14 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData> {
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
- RowDataChannelComputer channelComputer = new
RowDataChannelComputer(table.schema(), false);
-
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
state =
new StoreSinkWriteState(
context,
(tableName, partition, bucket) ->
- channelComputer.channel(partition, bucket)
+ ChannelComputer.select(
+ partition,
+ bucket,
+
getRuntimeContext().getNumberOfParallelSubtasks())
==
getRuntimeContext().getIndexOfThisSubtask());
write =
@@ -101,8 +99,6 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData> {
@Override
public void open() throws Exception {
super.open();
- partitionSerializer = new
InternalRowSerializer(table.schema().logicalPartitionType());
- reusedPartition = new OffsetRow(partitionSerializer.getArity(), 1);
dataFileMetaSerializer = new DataFileMetaSerializer();
}
@@ -111,13 +107,9 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData> {
RowData record = element.getValue();
long snapshotId = record.getLong(0);
-
- reusedPartition.replace(new FlinkRowWrapper(record));
- BinaryRow partition =
partitionSerializer.toBinaryRow(reusedPartition).copy();
-
- int bucket = record.getInt(partitionSerializer.getArity() + 1);
-
- byte[] serializedFiles =
record.getBinary(partitionSerializer.getArity() + 2);
+ BinaryRow partition = deserializeBinaryRow(record.getBinary(1));
+ int bucket = record.getInt(2);
+ byte[] serializedFiles = record.getBinary(3);
List<DataFileMeta> files =
dataFileMetaSerializer.deserializeList(serializedFiles);
if (isStreaming) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 0eae74e00..d3e22efe5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-import java.util.function.Function;
/** Helper class of {@link PrepareCommitOperator} for different types of
paimon sinks. */
public interface StoreSinkWrite {
@@ -59,7 +58,7 @@ public interface StoreSinkWrite {
* changes. {@link TableWriteImpl} with the new schema will be provided by
{@code
* newWriteProvider}.
*/
- void replace(Function<String, TableWriteImpl<?>> newWriteProvider) throws
Exception;
+ void replace(FileStoreTable newTable) throws Exception;
/** Provider of {@link StoreSinkWrite}. */
@FunctionalInterface
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 67f6ddab1..18ecc44a0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.Function;
/** Default implementation of {@link StoreSinkWrite}. This writer does not
have states. */
public class StoreSinkWriteImpl implements StoreSinkWrite {
@@ -44,6 +43,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
protected final String commitUser;
protected final StoreSinkWriteState state;
+ private final IOManager ioManager;
+ private final boolean isOverwrite;
private final boolean waitCompaction;
protected TableWriteImpl<?> write;
@@ -57,12 +58,19 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
boolean waitCompaction) {
this.commitUser = commitUser;
this.state = state;
+ this.ioManager = ioManager;
+ this.isOverwrite = isOverwrite;
this.waitCompaction = waitCompaction;
+ this.write = newTableWrite(table);
+ }
- write =
- table.newWrite(commitUser)
- .withIOManager(new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
- .withOverwrite(isOverwrite);
+ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
+ return table.newWrite(
+ commitUser,
+ (part, bucket) ->
+ state.stateValueFilter().filter(table.name(),
part, bucket))
+ .withIOManager(new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+ .withOverwrite(isOverwrite);
}
@Override
@@ -125,14 +133,14 @@ public class StoreSinkWriteImpl implements StoreSinkWrite
{
}
@Override
- public void replace(Function<String, TableWriteImpl<?>> newWriteProvider)
throws Exception {
+ public void replace(FileStoreTable newTable) throws Exception {
if (commitUser == null) {
return;
}
List<AbstractFileStoreWrite.State> states = write.checkpoint();
write.close();
- write = newWriteProvider.apply(commitUser);
+ write = newTableWrite(newTable);
write.restore(states);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index feff9e9dd..072d6a1b9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -46,6 +46,8 @@ import java.util.Map;
*/
public class StoreSinkWriteState {
+ private final StateValueFilter stateValueFilter;
+
private final ListState<Tuple5<String, String, byte[], Integer, byte[]>>
listState;
private final Map<String, Map<String, List<StateValue>>> map;
@@ -53,6 +55,7 @@ public class StoreSinkWriteState {
public StoreSinkWriteState(
StateInitializationContext context, StateValueFilter
stateValueFilter)
throws Exception {
+ this.stateValueFilter = stateValueFilter;
TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>>
listStateSerializer =
new TupleSerializer<>(
(Class<Tuple5<String, String, byte[], Integer,
byte[]>>)
@@ -81,6 +84,10 @@ public class StoreSinkWriteState {
}
}
+ public StateValueFilter stateValueFilter() {
+ return stateValueFilter;
+ }
+
public @Nullable List<StateValue> get(String tableName, String key) {
Map<String, List<StateValue>> innerMap = map.get(tableName);
return innerMap == null ? null : innerMap.get(key);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
index c15340a74..43eb14a6a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
@@ -50,8 +50,7 @@ public class CdcRecordChannelComputer implements
ChannelComputer<CdcRecord> {
}
public int channel(BinaryRow partition, int bucket) {
- int startChannel = Math.abs(partition.hashCode()) % numChannels;
- return (startChannel + bucket) % numChannels;
+ return ChannelComputer.select(partition, bucket, numChannels);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
index df45d8281..3e9cafeb0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -111,7 +111,7 @@ public class CdcRecordStoreWriteOperator extends
PrepareCommitOperator<CdcRecord
}
Thread.sleep(retrySleepMillis);
}
- write.replace(commitUser -> table.newWrite(commitUser));
+ write.replace(table);
}
try {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 05237690e..2e8ab836f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link CompactorSourceBuilder}. */
@@ -272,18 +273,20 @@ public class CompactorSourceITCase extends
AbstractTestBase {
private String toString(RowData rowData) {
int numFiles;
try {
- numFiles =
dataFileMetaSerializer.deserializeList(rowData.getBinary(4)).size();
+ numFiles =
dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
+ BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1));
+
return String.format(
"%s %d|%s|%d|%d|%d",
rowData.getRowKind().shortString(),
rowData.getLong(0),
- rowData.getString(1).toString(),
+ partition.getString(0),
+ partition.getInt(1),
rowData.getInt(2),
- rowData.getInt(3),
numFiles);
}