This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 29fb04ab6 [core] Supports cross partition update (#1719)
29fb04ab6 is described below
commit 29fb04ab6e46d7466506f9ba1982c7401de1d282
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 7 14:43:22 2023 +0800
[core] Supports cross partition update (#1719)
---
docs/content/concepts/primary-key-table.md | 47 +++-
.../java/org/apache/paimon/utils/IDMapping.java | 56 +++++
.../org/apache/paimon/utils/PositiveIntInt.java | 52 ++--
.../paimon/utils/PositiveIntIntSerializer.java | 50 ++++
.../utils/RowDataToObjectArrayConverter.java | 5 +-
.../org/apache/paimon/utils/SerBiFunction.java | 28 +--
.../paimon/utils/PositiveIntIntSerializerTest.java | 53 +++++
.../java/org/apache/paimon/KeyValueFileStore.java | 11 +-
.../org/apache/paimon/codegen/CodeGenUtils.java | 6 +
.../paimon/operation/AbstractFileStoreScan.java | 24 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 2 +-
.../org/apache/paimon/operation/FileStoreScan.java | 2 +
.../paimon/operation/KeyValueFileStoreScan.java | 2 +-
.../main/java/org/apache/paimon/schema/Schema.java | 7 +-
.../org/apache/paimon/schema/SchemaValidation.java | 8 +
.../java/org/apache/paimon/schema/TableSchema.java | 15 +-
.../paimon/table/AbstractFileStoreTable.java | 1 +
.../java/org/apache/paimon/table/BucketMode.java | 7 +
.../table/ChangelogValueCountFileStoreTable.java | 1 +
.../table/ChangelogWithKeyFileStoreTable.java | 1 +
.../table/source/AbstractInnerTableScan.java | 6 +
.../table/source/snapshot/SnapshotReader.java | 2 +
.../table/source/snapshot/SnapshotReaderImpl.java | 6 +
.../apache/paimon/table/system/AuditLogTable.java | 6 +
.../test/java/org/apache/paimon/TestFileStore.java | 1 +
.../org/apache/paimon/schema/TableSchemaTest.java | 9 +-
.../flink/lookup/NoPrimaryKeyLookupTable.java | 2 +-
.../paimon/flink/lookup/PrimaryKeyLookupTable.java | 2 +-
.../paimon/flink/lookup/RocksDBListState.java | 20 +-
.../paimon/flink/lookup/RocksDBSetState.java | 17 +-
.../apache/paimon/flink/lookup/RocksDBState.java | 13 +-
.../paimon/flink/lookup/RocksDBStateFactory.java | 25 +-
.../paimon/flink/lookup/RocksDBValueState.java | 17 +-
.../flink/lookup/SecondaryIndexLookupTable.java | 2 +-
.../apache/paimon/flink/sink/ChannelComputer.java | 16 ++
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 +-
.../apache/paimon/flink/sink/cdc/CdcRecord.java | 8 +-
.../CdcRecordPartitionKeyExtractor.java} | 34 +--
.../paimon/flink/sink/cdc/CdcRecordUtils.java | 13 +
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 10 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 10 +-
.../flink/sink/index/GlobalDynamicBucketSink.java | 126 ++++++++++
.../sink/index/GlobalDynamicCdcBucketSink.java | 133 +++++++++++
.../flink/sink/index/GlobalIndexAssigner.java | 262 +++++++++++++++++++++
.../sink/index/GlobalIndexAssignerOperator.java | 133 +++++++++++
.../paimon/flink/sink/index/IndexBootstrap.java | 69 ++++++
.../flink/sink/index/IndexBootstrapOperator.java | 65 +++++
.../KeyPartOrRow.java} | 48 ++--
.../sink/index/KeyPartPartitionKeyExtractor.java | 61 +++++
.../sink/index/KeyPartRowChannelComputer.java | 68 ++++++
.../flink/sink/index/KeyWithRowSerializer.java | 97 ++++++++
.../paimon/flink/utils/InternalTypeInfo.java | 100 ++++++++
.../paimon/flink/utils/InternalTypeSerializer.java | 62 +++++
.../flink/utils/ProjectToRowDataFunction.java | 83 +++++++
.../flink/GlobalDynamicBucketTableITCase.java | 87 +++++++
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 14 +-
.../flink/sink/index/GlobalIndexAssignerTest.java | 198 ++++++++++++++++
.../flink/sink/index/IndexBootstrapTest.java | 79 +++++++
.../org/apache/paimon/spark/SparkReadITCase.java | 16 --
59 files changed, 2090 insertions(+), 219 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 9218178be..ab4a9ed16 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -36,16 +36,41 @@ By [defining primary keys]({{< ref
"how-to/creating-tables#tables-with-primary-k
A bucket is the smallest storage unit for reads and writes, each bucket
directory contains an [LSM tree]({{< ref "concepts/file-layouts#lsm-trees" >}}).
-Primary Key Table supports two bucket mode:
-1. Fixed Bucket mode: configure a bucket greater than 0, rescaling buckets can
only be done through offline processes,
- see [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}). A too
large number of buckets leads to too many
- small files, and a too small number of buckets leads to poor write
performance.
-2. Dynamic Bucket mode: configure `'bucket' = '-1'`, Paimon dynamically
maintains the index, automatic expansion of
- the number of buckets. (This is an experimental feature)
- - Option1: `'dynamic-bucket.target-row-num'`: controls the target row
number for one bucket.
- - Option2: `'dynamic-bucket.assigner-parallelism'`: Parallelism of assigner
operator, controls the number of initialized bucket.
- - Dynamic Bucket requires more memory, 100 million entries in a partition
takes up 1 GB more memory, partitions that are no longer active do not take up
memory.
- - Dynamic Bucket only support single write job. Please do not start
multiple jobs to write to the same partition.
+### Fixed Bucket
+
+Configure a bucket greater than 0, rescaling buckets can only be done through
offline processes,
+see [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}). A too large
number of buckets leads to too many
+small files, and a too small number of buckets leads to poor write performance.
+
+### Dynamic Bucket
+
+{{< hint info >}}
+This is an experimental feature.
+{{< /hint >}}
+
+Configure `'bucket' = '-1'`, Paimon dynamically maintains the index, automatic
expansion of the number of buckets.
+
+- Option1: `'dynamic-bucket.target-row-num'`: controls the target row number
for one bucket.
+- Option2: `'dynamic-bucket.assigner-parallelism'`: Parallelism of assigner
operator, controls the number of initialized bucket.
+
+{{< hint info >}}
+Dynamic Bucket only support single write job. Please do not start multiple
jobs to write to the same partition.
+{{< /hint >}}
+
+**Normal Dynamic Bucket Mode**:
+
+When your updates do not cross partitions (no partitions, or primary keys
contain all partition fields), Dynamic
+Bucket mode uses HASH index to maintain mapping from key to bucket, it
requires more memory than fixed bucket mode,
+100 million entries in a partition takes up 1 GB more memory, partitions that
are no longer active do not take up memory.
+
+**Cross Partitions Update Dynamic Bucket Mode**:
+
+When you need cross partition updates (primary keys not contain all partition
fields), Dynamic Bucket mode directly
+maintains the mapping of keys to partition and bucket, uses local disks, and
initializes indexes by reading all
+existing keys in the table when starting stream write job. Different merge
engines have different behaviors:
+1. Deduplicate: Delete data from the old partition and insert new data into
the new partition.
+2. PartialUpdate & Aggregation: Insert new data into the old partition.
+3. FirstRow: Ignore new data if there is old value.
## Merge Engines
@@ -165,8 +190,6 @@ INSERT INTO T VALUES (1, null,null,1);
SELECT * FROM T; -- output 1, 1, 0, 1
```
-
-
### Aggregation
{{< hint info >}}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IDMapping.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IDMapping.java
new file mode 100644
index 000000000..4c5d1d7a8
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IDMapping.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** Incremental id generation. */
+public class IDMapping<T> {
+
+ private final Function<T, T> copy;
+
+ private final List<T> values = new ArrayList<>();
+ private final Map<T, Integer> indexMap = new HashMap<>();
+
+ private int nextIndex = 0;
+
+ public IDMapping(Function<T, T> copy) {
+ this.copy = copy;
+ }
+
+ public int index(T t) {
+ Integer index = indexMap.get(t);
+ if (index == null) {
+ index = nextIndex;
+ nextIndex++;
+ T copied = copy.apply(t);
+ indexMap.put(copied, index);
+ values.add(copied);
+ }
+ return index;
+ }
+
+ public T get(int index) {
+ return values.get(index);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntInt.java
similarity index 53%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntInt.java
index 363c5490e..1042bee3d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntInt.java
@@ -16,59 +16,55 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.annotation.Experimental;
-import org.apache.paimon.types.RowKind;
+package org.apache.paimon.utils;
import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
import java.util.Objects;
-/** A data change message from the CDC source. */
-@Experimental
-public class CdcRecord implements Serializable {
+import static org.apache.paimon.utils.Preconditions.checkArgument;
- private static final long serialVersionUID = 1L;
+/** IntInt pojo class. */
+public class PositiveIntInt implements Serializable {
- private final RowKind kind;
- private final Map<String, String> fields;
+ private static final long serialVersionUID = 1L;
- public CdcRecord(RowKind kind, Map<String, String> fields) {
- this.kind = kind;
- this.fields = fields;
- }
+ private final int i1;
+ private final int i2;
- public static CdcRecord emptyRecord() {
- return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
+ public PositiveIntInt(int i1, int i2) {
+ checkArgument(i1 >= 0);
+ checkArgument(i2 >= 0);
+ this.i1 = i1;
+ this.i2 = i2;
}
- public RowKind kind() {
- return kind;
+ public int i1() {
+ return i1;
}
- public Map<String, String> fields() {
- return fields;
+ public int i2() {
+ return i2;
}
@Override
public boolean equals(Object o) {
- if (!(o instanceof CdcRecord)) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
}
-
- CdcRecord that = (CdcRecord) o;
- return Objects.equals(kind, that.kind) && Objects.equals(fields,
that.fields);
+ PositiveIntInt intInt = (PositiveIntInt) o;
+ return i1 == intInt.i1 && i2 == intInt.i2;
}
@Override
public int hashCode() {
- return Objects.hash(kind, fields);
+ return Objects.hash(i1, i2);
}
@Override
public String toString() {
- return kind.shortString() + " " + fields;
+ return "IntInt{" + "i1=" + i1 + ", i2=" + i2 + '}';
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntIntSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntIntSerializer.java
new file mode 100644
index 000000000..a6c59f273
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntIntSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.serializer.SerializerSingleton;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+
+import java.io.IOException;
+
+import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
+import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
+
+/** A {@link SerializerSingleton} for {@link PositiveIntInt}. */
+public class PositiveIntIntSerializer extends
SerializerSingleton<PositiveIntInt> {
+
+ @Override
+ public PositiveIntInt copy(PositiveIntInt from) {
+ return from;
+ }
+
+ @Override
+ public void serialize(PositiveIntInt record, DataOutputView target) throws
IOException {
+ encodeInt(target, record.i1());
+ encodeInt(target, record.i2());
+ }
+
+ @Override
+ public PositiveIntInt deserialize(DataInputView source) throws IOException
{
+ int i1 = decodeInt(source);
+ int i2 = decodeInt(source);
+ return new PositiveIntInt(i1, i2);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index 6b710fcc7..502067566 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -24,12 +24,15 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.RowType;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
/** Convert {@link InternalRow} to object array. */
-public class RowDataToObjectArrayConverter {
+public class RowDataToObjectArrayConverter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private final RowType rowType;
private final InternalRow.FieldGetter[] fieldGetters;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
b/paimon-common/src/main/java/org/apache/paimon/utils/SerBiFunction.java
similarity index 56%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/SerBiFunction.java
index 648143d92..e90465934 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/SerBiFunction.java
@@ -16,29 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
+package org.apache.paimon.utils;
import java.io.Serializable;
+import java.util.function.BiFunction;
-/**
- * A utility class to compute which downstream channel a given record should
be sent to.
- *
- * @param <T> type of record
- */
-public interface ChannelComputer<T> extends Serializable {
-
- void setup(int numChannels);
-
- int channel(T record);
-
- static int select(BinaryRow partition, int bucket, int numChannels) {
- int startChannel = Math.abs(partition.hashCode()) % numChannels;
- return (startChannel + bucket) % numChannels;
- }
-
- static int select(int bucket, int numChannels) {
- return bucket % numChannels;
- }
-}
+/** A {@link BiFunction} that is also {@link Serializable}. */
+@FunctionalInterface
+public interface SerBiFunction<T, U, R> extends BiFunction<T, U, R>,
Serializable {}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/PositiveIntIntSerializerTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/PositiveIntIntSerializerTest.java
new file mode 100644
index 000000000..1d184634e
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/utils/PositiveIntIntSerializerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.data.serializer.SerializerTestBase;
+
+import java.util.Random;
+
+/** Test for {@link PositiveIntIntSerializer}. */
+public class PositiveIntIntSerializerTest extends
SerializerTestBase<PositiveIntInt> {
+
+ @Override
+ protected Serializer<PositiveIntInt> createSerializer() {
+ return new PositiveIntIntSerializer();
+ }
+
+ @Override
+ protected boolean deepEquals(PositiveIntInt t1, PositiveIntInt t2) {
+ return t1.equals(t2);
+ }
+
+ @Override
+ protected PositiveIntInt[] getTestData() {
+ Random rnd = new Random();
+ int rndInt = Math.abs(rnd.nextInt());
+
+ int[] ints = new int[] {0, 1, Integer.MAX_VALUE, rndInt};
+ PositiveIntInt[] intInts = new PositiveIntInt[ints.length];
+ for (int i = 0; i < ints.length; i++) {
+ intInts[i] =
+ new PositiveIntInt(
+ ints[rnd.nextInt(ints.length)],
ints[rnd.nextInt(ints.length)]);
+ }
+ return intInts;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index ba1ef81db..a6b22e7c3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -50,12 +50,14 @@ import java.util.function.Supplier;
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private static final long serialVersionUID = 1L;
+ private final boolean crossPartitionUpdate;
private final RowType bucketKeyType;
private final RowType keyType;
private final RowType valueType;
@@ -68,6 +70,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
FileIO fileIO,
SchemaManager schemaManager,
long schemaId,
+ boolean crossPartitionUpdate,
CoreOptions options,
RowType partitionType,
RowType bucketKeyType,
@@ -76,6 +79,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory) {
super(fileIO, schemaManager, schemaId, options, partitionType);
+ this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
this.valueType = valueType;
@@ -87,7 +91,12 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public BucketMode bucketMode() {
- return options.bucket() == -1 ? BucketMode.DYNAMIC : BucketMode.FIXED;
+ if (options.bucket() == -1) {
+ return crossPartitionUpdate ? BucketMode.GLOBAL_DYNAMIC :
BucketMode.DYNAMIC;
+ } else {
+ checkArgument(!crossPartitionUpdate);
+ return BucketMode.FIXED;
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
index df9104a61..868734b0f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
@@ -29,6 +29,12 @@ public class CodeGenUtils {
public static final Projection EMPTY_PROJECTION = input ->
BinaryRow.EMPTY_ROW;
+ public static Projection newProjection(RowType inputType, List<String>
fields) {
+ List<String> fieldNames = inputType.getFieldNames();
+ int[] mapping =
fields.stream().mapToInt(fieldNames::indexOf).toArray();
+ return newProjection(inputType, mapping);
+ }
+
public static Projection newProjection(RowType inputType, int[] mapping) {
if (mapping.length == 0) {
return EMPTY_PROJECTION;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 836e386ca..45f6bd153 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -66,11 +66,11 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
- protected final ScanBucketFilter bucketFilter;
+ protected final ScanBucketFilter bucketKeyFilter;
private Predicate partitionFilter;
private Snapshot specifiedSnapshot = null;
- private Integer specifiedBucket = null;
+ private Filter<Integer> bucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
private ScanKind scanKind = ScanKind.ALL;
private Filter<Integer> levelFilter = null;
@@ -80,7 +80,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
public AbstractFileStoreScan(
RowType partitionType,
- ScanBucketFilter bucketFilter,
+ ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
ManifestFile.Factory manifestFileFactory,
@@ -90,7 +90,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
Integer scanManifestParallelism) {
this.partitionStatsConverter = new
FieldStatsArraySerializer(partitionType);
this.partitionConverter = new
RowDataToObjectArrayConverter(partitionType);
- this.bucketFilter = bucketFilter;
+ this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
this.schemaManager = schemaManager;
this.manifestFileFactory = manifestFileFactory;
@@ -123,7 +123,13 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withBucket(int bucket) {
- this.specifiedBucket = bucket;
+ this.bucketFilter = i -> i == bucket;
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {
+ this.bucketFilter = bucketFilter;
return this;
}
@@ -312,12 +318,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
private boolean filterByBucket(ManifestEntry entry) {
- return (specifiedBucket == null || entry.bucket() == specifiedBucket);
+ return (bucketFilter == null || bucketFilter.test(entry.bucket()));
}
/** Note: Keep this thread-safe. */
private boolean filterByBucketSelector(ManifestEntry entry) {
- return bucketFilter.select(entry.bucket(), entry.totalBuckets());
+ return bucketKeyFilter.select(entry.bucket(), entry.totalBuckets());
}
/** Note: Keep this thread-safe. */
@@ -349,8 +355,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return false;
}
- if (specifiedBucket != null && numOfBuckets ==
totalBucketGetter.apply(row)) {
- return specifiedBucket.intValue() == bucketGetter.apply(row);
+ if (bucketFilter != null && numOfBuckets ==
totalBucketGetter.apply(row)) {
+ return bucketFilter.test(bucketGetter.apply(row));
}
return true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 118c9f269..e0df67766 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -62,7 +62,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
this.filter = predicate;
- this.bucketFilter.pushdown(predicate);
+ this.bucketKeyFilter.pushdown(predicate);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 457d9a0bf..441b6eef7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -45,6 +45,8 @@ public interface FileStoreScan {
FileStoreScan withBucket(int bucket);
+ FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);
+
FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
FileStoreScan withSnapshot(long snapshotId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index dc480074f..d953f9c06 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -65,7 +65,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
this.keyFilter = predicate;
- this.bucketFilter.pushdown(predicate);
+ this.bucketKeyFilter.pushdown(predicate);
return this;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index 85fee3d56..7f464b586 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -133,14 +133,9 @@ public class Schema {
"Table column %s should include all primary key constraint %s",
fieldNames,
primaryKeys);
- Set<String> pkSet = new HashSet<>(primaryKeys);
- Preconditions.checkState(
- pkSet.containsAll(partitionKeys),
- "Primary key constraint %s should include all partition fields
%s",
- primaryKeys,
- partitionKeys);
// primary key should not nullable
+ Set<String> pkSet = new HashSet<>(primaryKeys);
List<DataField> newFields = new ArrayList<>();
for (DataField field : fields) {
if (pkSet.contains(field.name()) && field.type().isNullable()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 37f779d35..95f7689e7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -185,6 +185,14 @@ public class SchemaValidation {
"Only support 'lookup' changelog-producer on
FIRST_MERGE merge engine");
}
}
+
+ if (schema.crossPartitionUpdate() && options.bucket() != -1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "You should use dynamic bucket (bucket = -1) mode
in cross partition update case "
+ + "(Primary key constraint %s not include
all partition fields %s).",
+ schema.primaryKeys(), schema.partitionKeys()));
+ }
}
private static void validatePrimaryKeysType(List<DataField> fields,
List<String> primaryKeys) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index f173f0f31..dd1d6bb6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -107,11 +107,6 @@ public class TableSchema implements Serializable {
public List<String> trimmedPrimaryKeys() {
if (primaryKeys.size() > 0) {
- Preconditions.checkState(
- primaryKeys.containsAll(partitionKeys),
- String.format(
- "Primary key constraint %s should include all
partition fields %s",
- primaryKeys, partitionKeys));
List<String> adjusted =
primaryKeys.stream()
.filter(pk -> !partitionKeys.contains(pk))
@@ -145,6 +140,14 @@ public class TableSchema implements Serializable {
return bucketKeys;
}
+ public boolean crossPartitionUpdate() {
+ if (primaryKeys.isEmpty() || partitionKeys.isEmpty()) {
+ return false;
+ }
+
+ return !primaryKeys.containsAll(partitionKeys);
+ }
+
/** Original bucket keys, maybe empty. */
private List<String> originalBucketKeys() {
String key = options.get(BUCKET_KEY.key());
@@ -215,7 +218,7 @@ public class TableSchema implements Serializable {
.collect(Collectors.toList());
}
- private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
+ public RowType projectedLogicalRowType(List<String> projectedFieldNames) {
return new RowType(projectedDataFields(projectedFieldNames));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 597943b20..8309d0f80 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -110,6 +110,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
case FIXED:
return new FixedBucketRowKeyExtractor(schema());
case DYNAMIC:
+ case GLOBAL_DYNAMIC:
return new DynamicBucketRowKeyExtractor(schema());
case UNAWARE:
return new UnawareBucketRowKeyExtractor(schema());
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
index bd31644c0..02dbea24e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -39,6 +39,13 @@ public enum BucketMode {
*/
DYNAMIC,
+ /**
+ * Compared with the DYNAMIC mode, this mode not only dynamically
allocates buckets for
+ * Partition table, but also updates data across partitions. The primary
key does not contain
+ * partition fields.
+ */
+ GLOBAL_DYNAMIC,
+
/**
* Ignoring buckets can be equivalent to understanding that all data
enters the global bucket,
* and data is randomly written to the table. The data in the bucket has
no order relationship
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 035816e1c..bb7a9a9b4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -88,6 +88,7 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
fileIO,
schemaManager(),
tableSchema.id(),
+ false,
new CoreOptions(tableSchema.options()),
tableSchema.logicalPartitionType(),
tableSchema.logicalBucketKeyType(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 624d9fc41..9e0157103 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -131,6 +131,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
fileIO(),
schemaManager(),
tableSchema.id(),
+ tableSchema.crossPartitionUpdate(),
options,
tableSchema.logicalPartitionType(),
addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 9f119363d..034a333b9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -41,6 +41,7 @@ import
org.apache.paimon.table.source.snapshot.StartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import java.util.List;
@@ -66,6 +67,11 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
return this;
}
+ public AbstractInnerTableScan withBucketFilter(Filter<Integer>
bucketFilter) {
+ snapshotReader.withBucketFilter(bucketFilter);
+ return this;
+ }
+
public CoreOptions options() {
return options;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index ae547ed97..0cee60086 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -55,6 +55,8 @@ public interface SnapshotReader {
SnapshotReader withBucket(int bucket);
+ SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
+
/** Get splits plan from snapshot. */
Plan read();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 848c40175..99d2b7291 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -167,6 +167,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
+ scan.withBucketFilter(bucketFilter);
+ return this;
+ }
+
/** Get splits from {@link FileKind#ADD} files. */
@Override
public Plan read() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index cc1f98f4b..2063859a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -234,6 +234,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
+ snapshotReader.withBucketFilter(bucketFilter);
+ return this;
+ }
+
@Override
public Plan read() {
return snapshotReader.read();
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 2d8b98a60..9b5441fd5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -105,6 +105,7 @@ public class TestFileStore extends KeyValueFileStore {
FileIOFinder.find(new Path(root)),
new SchemaManager(FileIOFinder.find(new Path(root)),
options.path()),
0L,
+ false,
options,
partitionType,
keyType,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 2d39a4c57..6171fdbb9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -47,12 +47,9 @@ public class TableSchemaTest {
List<String> primaryKeys = Collections.singletonList("f1");
Map<String, String> options = new HashMap<>();
- assertThatThrownBy(
- () ->
- new TableSchema(
- 1, fields, 10, partitionKeys,
primaryKeys, options, ""))
- .isInstanceOf(IllegalStateException.class)
- .hasMessage("Primary key constraint [f1] should include all
partition fields [f0]");
+ TableSchema schema =
+ new TableSchema(1, fields, 10, partitionKeys, primaryKeys,
options, "");
+ assertThat(schema.crossPartitionUpdate()).isTrue();
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index c413d8a3e..caa809062 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -35,7 +35,7 @@ import java.util.function.Predicate;
/** A {@link LookupTable} for table without primary key. */
public class NoPrimaryKeyLookupTable implements LookupTable {
- private final RocksDBListState state;
+ private final RocksDBListState<InternalRow, InternalRow> state;
private final Predicate<InternalRow> recordFilter;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 13e04089d..0928bf047 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -34,7 +34,7 @@ import java.util.function.Predicate;
/** A {@link LookupTable} for primary key table. */
public class PrimaryKeyLookupTable implements LookupTable {
- protected final RocksDBValueState tableState;
+ protected final RocksDBValueState<InternalRow, InternalRow> tableState;
protected final Predicate<InternalRow> recordFilter;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
index f2ee7ee43..72ea7e9a0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.lookup;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.Serializer;
import org.rocksdb.ColumnFamilyHandle;
@@ -30,22 +29,20 @@ import java.util.Collections;
import java.util.List;
/** RocksDB state for key -> List of value. */
-public class RocksDBListState extends RocksDBState<List<InternalRow>> {
+public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> {
private final ListDelimitedSerializer listSerializer = new
ListDelimitedSerializer();
- private static final List<InternalRow> EMPTY = Collections.emptyList();
-
public RocksDBListState(
RocksDB db,
ColumnFamilyHandle columnFamily,
- Serializer<InternalRow> keySerializer,
- Serializer<InternalRow> valueSerializer,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
long lruCacheSize) {
super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
}
- public void add(InternalRow key, InternalRow value) throws IOException {
+ public void add(K key, V value) throws IOException {
byte[] keyBytes = serializeKey(key);
byte[] valueBytes = serializeValue(value);
try {
@@ -56,7 +53,7 @@ public class RocksDBListState extends
RocksDBState<List<InternalRow>> {
cache.invalidate(wrap(keyBytes));
}
- public List<InternalRow> get(InternalRow key) throws IOException {
+ public List<V> get(K key) throws IOException {
byte[] keyBytes = serializeKey(key);
return cache.get(
wrap(keyBytes),
@@ -67,16 +64,15 @@ public class RocksDBListState extends
RocksDBState<List<InternalRow>> {
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
- List<InternalRow> rows =
- listSerializer.deserializeList(valueBytes,
valueSerializer);
+ List<V> rows = listSerializer.deserializeList(valueBytes,
valueSerializer);
if (rows == null) {
- return EMPTY;
+ return Collections.emptyList();
}
return rows;
});
}
- private byte[] serializeValue(InternalRow value) throws IOException {
+ private byte[] serializeValue(V value) throws IOException {
valueOutputView.clear();
valueSerializer.serialize(value, valueOutputView);
return valueOutputView.getCopyOfBuffer();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
index feb7e667b..a27ff44a4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.lookup;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.Serializer;
import org.rocksdb.ColumnFamilyHandle;
@@ -34,20 +33,20 @@ import java.util.List;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Rocksdb state for key -> Set values. */
-public class RocksDBSetState extends RocksDBState<List<byte[]>> {
+public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> {
private static final byte[] EMPTY = new byte[0];
public RocksDBSetState(
RocksDB db,
ColumnFamilyHandle columnFamily,
- Serializer<InternalRow> keySerializer,
- Serializer<InternalRow> valueSerializer,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
long lruCacheSize) {
super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
}
- public List<InternalRow> get(InternalRow key) throws IOException {
+ public List<V> get(K key) throws IOException {
ByteArray keyBytes = wrap(serializeKey(key));
List<byte[]> valueBytes = cache.getIfPresent(keyBytes);
if (valueBytes == null) {
@@ -67,7 +66,7 @@ public class RocksDBSetState extends
RocksDBState<List<byte[]>> {
cache.put(keyBytes, valueBytes);
}
- List<InternalRow> values = new ArrayList<>(valueBytes.size());
+ List<V> values = new ArrayList<>(valueBytes.size());
for (byte[] value : valueBytes) {
valueInputView.setBuffer(value);
values.add(valueSerializer.deserialize(valueInputView));
@@ -75,7 +74,7 @@ public class RocksDBSetState extends
RocksDBState<List<byte[]>> {
return values;
}
- public void retract(InternalRow key, InternalRow value) throws IOException
{
+ public void retract(K key, V value) throws IOException {
try {
byte[] bytes = invalidKeyAndGetKVBytes(key, value);
if (db.get(columnFamily, bytes) != null) {
@@ -86,7 +85,7 @@ public class RocksDBSetState extends
RocksDBState<List<byte[]>> {
}
}
- public void add(InternalRow key, InternalRow value) throws IOException {
+ public void add(K key, V value) throws IOException {
try {
byte[] bytes = invalidKeyAndGetKVBytes(key, value);
db.put(columnFamily, writeOptions, bytes, EMPTY);
@@ -95,7 +94,7 @@ public class RocksDBSetState extends
RocksDBState<List<byte[]>> {
}
}
- private byte[] invalidKeyAndGetKVBytes(InternalRow key, InternalRow value)
throws IOException {
+ private byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException {
checkArgument(value != null);
keyOutView.clear();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
index 7269d1234..784b61301 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.lookup;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputSerializer;
@@ -37,7 +36,7 @@ import java.io.IOException;
import java.util.Arrays;
/** Rocksdb state for key value. */
-public abstract class RocksDBState<CacheV> {
+public abstract class RocksDBState<K, V, CacheV> {
protected final RocksDB db;
@@ -45,9 +44,9 @@ public abstract class RocksDBState<CacheV> {
protected final ColumnFamilyHandle columnFamily;
- protected final Serializer<InternalRow> keySerializer;
+ protected final Serializer<K> keySerializer;
- protected final Serializer<InternalRow> valueSerializer;
+ protected final Serializer<V> valueSerializer;
protected final DataOutputSerializer keyOutView;
@@ -60,8 +59,8 @@ public abstract class RocksDBState<CacheV> {
public RocksDBState(
RocksDB db,
ColumnFamilyHandle columnFamily,
- Serializer<InternalRow> keySerializer,
- Serializer<InternalRow> valueSerializer,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
long lruCacheSize) {
this.db = db;
this.columnFamily = columnFamily;
@@ -78,7 +77,7 @@ public abstract class RocksDBState<CacheV> {
.build();
}
- protected byte[] serializeKey(InternalRow key) throws IOException {
+ protected byte[] serializeKey(K key) throws IOException {
keyOutView.clear();
keySerializer.serialize(key, keyOutView);
return keyOutView.getCopyOfBuffer();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index c32703af7..af6a7f201 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.lookup;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.flink.RocksDBOptions;
@@ -63,34 +62,34 @@ public class RocksDBStateFactory implements Closeable {
}
}
- public RocksDBValueState valueState(
+ public <K, V> RocksDBValueState<K, V> valueState(
String name,
- Serializer<InternalRow> keySerializer,
- Serializer<InternalRow> valueSerializer,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
long lruCacheSize)
throws IOException {
- return new RocksDBValueState(
+ return new RocksDBValueState<>(
db, createColumnFamily(name), keySerializer, valueSerializer,
lruCacheSize);
}
- public RocksDBSetState setState(
+ public <K, V> RocksDBSetState<K, V> setState(
String name,
- Serializer<InternalRow> keySerializer,
- Serializer<InternalRow> valueSerializer,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
long lruCacheSize)
throws IOException {
- return new RocksDBSetState(
+ return new RocksDBSetState<>(
db, createColumnFamily(name), keySerializer, valueSerializer,
lruCacheSize);
}
- public RocksDBListState listState(
+ public <K, V> RocksDBListState<K, V> listState(
String name,
- Serializer<InternalRow> keySerializer,
- Serializer<InternalRow> valueSerializer,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
long lruCacheSize)
throws IOException {
- return new RocksDBListState(
+ return new RocksDBListState<>(
db, createColumnFamily(name), keySerializer, valueSerializer,
lruCacheSize);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
index 624d5ba93..077906403 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.lookup;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.Serializer;
import org.rocksdb.ColumnFamilyHandle;
@@ -32,19 +31,19 @@ import java.io.IOException;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Rocksdb state for key -> a single value. */
-public class RocksDBValueState extends RocksDBState<RocksDBState.Reference> {
+public class RocksDBValueState<K, V> extends RocksDBState<K, V,
RocksDBState.Reference> {
public RocksDBValueState(
RocksDB db,
ColumnFamilyHandle columnFamily,
- Serializer<InternalRow> keySerializer,
- Serializer<InternalRow> valueSerializer,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
long lruCacheSize) {
super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
}
@Nullable
- public InternalRow get(InternalRow key) throws IOException {
+ public V get(K key) throws IOException {
try {
Reference valueRef = get(wrap(serializeKey(key)));
return valueRef.isPresent() ? deserializeValue(valueRef.bytes) :
null;
@@ -63,7 +62,7 @@ public class RocksDBValueState extends
RocksDBState<RocksDBState.Reference> {
return valueRef;
}
- public void put(InternalRow key, InternalRow value) throws IOException {
+ public void put(K key, V value) throws IOException {
checkArgument(value != null);
try {
@@ -76,7 +75,7 @@ public class RocksDBValueState extends
RocksDBState<RocksDBState.Reference> {
}
}
- public void delete(InternalRow key) throws IOException {
+ public void delete(K key) throws IOException {
try {
byte[] keyBytes = serializeKey(key);
ByteArray keyByteArray = wrap(keyBytes);
@@ -89,12 +88,12 @@ public class RocksDBValueState extends
RocksDBState<RocksDBState.Reference> {
}
}
- private InternalRow deserializeValue(byte[] valueBytes) throws IOException
{
+ private V deserializeValue(byte[] valueBytes) throws IOException {
valueInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(valueInputView);
}
- private byte[] serializeValue(InternalRow value) throws IOException {
+ private byte[] serializeValue(V value) throws IOException {
valueOutputView.clear();
valueSerializer.serialize(value, valueOutputView);
return valueOutputView.getCopyOfBuffer();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index 80aeb20ac..61452d681 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -34,7 +34,7 @@ import java.util.function.Predicate;
/** A {@link LookupTable} for primary key table which provides lookup by
secondary key. */
public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
- private final RocksDBSetState indexState;
+ private final RocksDBSetState<InternalRow, InternalRow> indexState;
private final KeyProjectedRow secKeyRow;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
index 648143d92..4a10d4386 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.SerializableFunction;
import java.io.Serializable;
@@ -41,4 +42,19 @@ public interface ChannelComputer<T> extends Serializable {
static int select(int bucket, int numChannels) {
return bucket % numChannels;
}
+
+ static <T, R> ChannelComputer<R> transform(
+ ChannelComputer<T> input, SerializableFunction<R, T> converter) {
+ return new ChannelComputer<R>() {
+ @Override
+ public void setup(int numChannels) {
+ input.setup(numChannels);
+ }
+
+ @Override
+ public int channel(R record) {
+ return input.channel(converter.apply(record));
+ }
+ };
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 36c34aee8..2c23dfdf9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -73,7 +74,9 @@ public class FlinkSinkBuilder {
case FIXED:
return buildForFixedBucket();
case DYNAMIC:
- return buildDynamicBucketSink();
+ return buildDynamicBucketSink(false);
+ case GLOBAL_DYNAMIC:
+ return buildDynamicBucketSink(true);
case UNAWARE:
return buildUnawareBucketSink();
default:
@@ -81,9 +84,11 @@ public class FlinkSinkBuilder {
}
}
- private DataStreamSink<?> buildDynamicBucketSink() {
+ private DataStreamSink<?> buildDynamicBucketSink(boolean globalIndex) {
checkArgument(logSinkFunction == null, "Dynamic bucket mode can not
work with log system.");
- return new RowDynamicBucketSink(table,
overwritePartition).build(input, parallelism);
+ return globalIndex
+ ? new GlobalDynamicBucketSink(table,
overwritePartition).build(input, parallelism)
+ : new RowDynamicBucketSink(table,
overwritePartition).build(input, parallelism);
}
private DataStreamSink<?> buildForFixedBucket() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 363c5490e..61d4ec7a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -32,7 +32,8 @@ public class CdcRecord implements Serializable {
private static final long serialVersionUID = 1L;
- private final RowKind kind;
+ private RowKind kind;
+
private final Map<String, String> fields;
public CdcRecord(RowKind kind, Map<String, String> fields) {
@@ -40,6 +41,11 @@ public class CdcRecord implements Serializable {
this.fields = fields;
}
+ public CdcRecord setRowKind(RowKind kind) {
+ this.kind = kind;
+ return this;
+ }
+
public static CdcRecord emptyRecord() {
return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
similarity index 50%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
index 648143d92..c4ceba307 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
@@ -16,29 +16,31 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink;
+package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import java.io.Serializable;
+/** A {@link PartitionKeyExtractor} to {@link InternalRow}. */
+public class CdcRecordPartitionKeyExtractor implements
PartitionKeyExtractor<CdcRecord> {
-/**
- * A utility class to compute which downstream channel a given record should
be sent to.
- *
- * @param <T> type of record
- */
-public interface ChannelComputer<T> extends Serializable {
+ private final CdcRecordKeyAndBucketExtractor extractor;
- void setup(int numChannels);
-
- int channel(T record);
+ public CdcRecordPartitionKeyExtractor(TableSchema schema) {
+ this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+ }
- static int select(BinaryRow partition, int bucket, int numChannels) {
- int startChannel = Math.abs(partition.hashCode()) % numChannels;
- return (startChannel + bucket) % numChannels;
+ @Override
+ public BinaryRow partition(CdcRecord record) {
+ extractor.setRecord(record);
+ return extractor.partition();
}
- static int select(int bucket, int numChannels) {
- return bucket % numChannels;
+ @Override
+ public BinaryRow trimmedPrimaryKey(CdcRecord record) {
+ extractor.setRecord(record);
+ return extractor.trimmedPrimaryKey();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index a55b788a4..d0c422c08 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -27,6 +27,7 @@ import org.apache.paimon.utils.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -113,4 +114,16 @@ public class CdcRecordUtils {
}
return Optional.of(genericRow);
}
+
+ public static CdcRecord fromGenericRow(GenericRow row, List<String>
fieldNames) {
+ Map<String, String> fields = new HashMap<>();
+ for (int i = 0; i < row.getFieldCount(); i++) {
+ Object field = row.getField(i);
+ if (field != null) {
+ fields.put(fieldNames.get(i), field.toString());
+ }
+ }
+
+ return new CdcRecord(row.getRowKind(), fields);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 5fd0f80d3..4f1745406 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
@@ -117,17 +118,16 @@ public class CdcSinkBuilder<T> {
case FIXED:
return buildForFixedBucket(parsed);
case DYNAMIC:
- return buildForDynamicBucket(parsed);
+ return new CdcDynamicBucketSink((FileStoreTable)
table).build(parsed, parallelism);
+ case GLOBAL_DYNAMIC:
+ return new GlobalDynamicCdcBucketSink((FileStoreTable) table)
+ .build(parsed, parallelism);
case UNAWARE:
default:
throw new UnsupportedOperationException("Unsupported bucket
mode: " + bucketMode);
}
}
- private DataStreamSink<?> buildForDynamicBucket(DataStream<CdcRecord>
parsed) {
- return new CdcDynamicBucketSink((FileStoreTable) table).build(parsed,
parallelism);
- }
-
private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord>
parsed) {
FileStoreTable dataTable = (FileStoreTable) table;
DataStream<CdcRecord> partitioned =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 0070f3ae9..c5b716e45 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
@@ -152,10 +153,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
sink.sinkFrom(new DataStream<>(input.getExecutionEnvironment(),
partitioned));
}
- private void buildForDynamicBucket(FileStoreTable table,
DataStream<CdcRecord> parsed) {
- new CdcDynamicBucketSink(table).build(parsed, parallelism);
- }
-
private void buildForFixedBucket(FileStoreTable table,
DataStream<CdcRecord> parsed) {
DataStream<CdcRecord> partitioned =
partition(parsed, new
CdcRecordChannelComputer(table.schema()), parallelism);
@@ -196,7 +193,10 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
buildForFixedBucket(table, parsedForTable);
break;
case DYNAMIC:
- buildForDynamicBucket(table, parsedForTable);
+ new CdcDynamicBucketSink(table).build(parsedForTable,
parallelism);
+ break;
+ case GLOBAL_DYNAMIC:
+ new
GlobalDynamicCdcBucketSink(table).build(parsedForTable, parallelism);
break;
case UNAWARE:
default:
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
new file mode 100644
index 000000000..5f98f3b05
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator;
+import org.apache.paimon.flink.sink.FlinkWriteSink;
+import org.apache.paimon.flink.sink.RowWithBucketChannelComputer;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+
+/** Sink for global dynamic bucket table. */
+public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData,
Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public GlobalDynamicBucketSink(
+ FileStoreTable table, @Nullable Map<String, String>
overwritePartition) {
+ super(table, overwritePartition);
+ }
+
+ @Override
+ protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable>
createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
+ return new DynamicBucketRowWriteOperator(table, writeProvider,
commitUser);
+ }
+
+ public DataStreamSink<?> build(DataStream<RowData> input, @Nullable
Integer parallelism) {
+ String initialCommitUser = UUID.randomUUID().toString();
+
+ TableSchema schema = table.schema();
+ RowType rowType = schema.logicalRowType();
+ List<String> primaryKeys = schema.primaryKeys();
+ List<String> partitionKeys = schema.partitionKeys();
+ RowDataSerializer rowSerializer = new
RowDataSerializer(toLogicalType(rowType));
+ RowType keyPartType =
+ schema.projectedLogicalRowType(
+ Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
+ .collect(Collectors.toList()));
+ RowDataSerializer keyPartSerializer = new
RowDataSerializer(toLogicalType(keyPartType));
+
+ // Topology:
+ // input -- bootstrap -- shuffle by key hash --> bucket-assigner --
shuffle by bucket -->
+ // writer --> committer
+
+ DataStream<Tuple2<KeyPartOrRow, RowData>> bootstraped =
+ input.transform(
+ "INDEX_BOOTSTRAP",
+ new InternalTypeInfo<>(
+ new KeyWithRowSerializer<>(
+ keyPartSerializer,
rowSerializer)),
+ new IndexBootstrapOperator<>(
+ new IndexBootstrap(table),
FlinkRowData::new))
+ .setParallelism(input.getParallelism());
+
+ // 1. shuffle by key hash
+ Integer assignerParallelism =
table.coreOptions().dynamicBucketAssignerParallelism();
+ if (assignerParallelism == null) {
+ assignerParallelism = parallelism;
+ }
+
+ KeyPartRowChannelComputer channelComputer =
+ new KeyPartRowChannelComputer(rowType, keyPartType,
primaryKeys);
+ DataStream<Tuple2<KeyPartOrRow, RowData>> partitionByKeyHash =
+ partition(bootstraped, channelComputer, assignerParallelism);
+
+ // 2. bucket-assigner
+ TupleTypeInfo<Tuple2<RowData, Integer>> rowWithBucketType =
+ new TupleTypeInfo<>(input.getType(),
BasicTypeInfo.INT_TYPE_INFO);
+ DataStream<Tuple2<RowData, Integer>> bucketAssigned =
+ partitionByKeyHash
+ .transform(
+ "dynamic-bucket-assigner",
+ rowWithBucketType,
+ GlobalIndexAssignerOperator.forRowData(table))
+ .setParallelism(partitionByKeyHash.getParallelism());
+
+ // 3. shuffle by bucket
+
+ DataStream<Tuple2<RowData, Integer>> partitionByBucket =
+ partition(bucketAssigned, new
RowWithBucketChannelComputer(schema), parallelism);
+
+ // 4. writer and committer
+ return sinkFrom(partitionByBucket, initialCommitUser);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
new file mode 100644
index 000000000..06bb5a147
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.FlinkWriteSink;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketWriteOperator;
+import org.apache.paimon.flink.sink.cdc.CdcHashKeyChannelComputer;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
+import org.apache.paimon.flink.sink.cdc.CdcWithBucketChannelComputer;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+
+/** Sink for global dynamic bucket table and {@link CdcRecord} inputs. */
+public class GlobalDynamicCdcBucketSink extends
FlinkWriteSink<Tuple2<CdcRecord, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public GlobalDynamicCdcBucketSink(FileStoreTable table) {
+ super(table, null);
+ }
+
+ @Override
+ protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
+ return new CdcDynamicBucketWriteOperator(table, writeProvider,
commitUser);
+ }
+
+ public DataStreamSink<?> build(DataStream<CdcRecord> input, @Nullable
Integer parallelism) {
+ String initialCommitUser = UUID.randomUUID().toString();
+
+ TableSchema schema = table.schema();
+ List<String> primaryKeys = schema.primaryKeys();
+ List<String> partitionKeys = schema.partitionKeys();
+ RowType keyPartType =
+ schema.projectedLogicalRowType(
+ Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
+ .collect(Collectors.toList()));
+ List<String> keyPartNames = keyPartType.getFieldNames();
+ RowDataToObjectArrayConverter keyPartConverter =
+ new RowDataToObjectArrayConverter(keyPartType);
+
+ // Topology:
+ // input -- bootstrap -- shuffle by key hash --> bucket-assigner --
shuffle by bucket -->
+ // writer --> committer
+
+ TupleTypeInfo<Tuple2<KeyPartOrRow, CdcRecord>> tuple2TupleType =
+ new TupleTypeInfo<>(new EnumTypeInfo<>(KeyPartOrRow.class),
input.getType());
+ DataStream<Tuple2<KeyPartOrRow, CdcRecord>> bootstraped =
+ input.transform(
+ "INDEX_BOOTSTRAP",
+ tuple2TupleType,
+ new IndexBootstrapOperator<>(
+ new IndexBootstrap(table),
+ row ->
+ CdcRecordUtils.fromGenericRow(
+ GenericRow.ofKind(
+
row.getRowKind(),
+
keyPartConverter.convert(row)),
+ keyPartNames)))
+ .setParallelism(input.getParallelism());
+
+ // 1. shuffle by key hash
+ Integer assignerParallelism =
table.coreOptions().dynamicBucketAssignerParallelism();
+ if (assignerParallelism == null) {
+ assignerParallelism = parallelism;
+ }
+
+ ChannelComputer<Tuple2<KeyPartOrRow, CdcRecord>> channelComputer =
+ ChannelComputer.transform(
+ new CdcHashKeyChannelComputer(schema), tuple2 ->
tuple2.f1);
+ DataStream<Tuple2<KeyPartOrRow, CdcRecord>> partitionByKeyHash =
+ partition(bootstraped, channelComputer, assignerParallelism);
+
+ // 2. bucket-assigner
+ TupleTypeInfo<Tuple2<CdcRecord, Integer>> rowWithBucketType =
+ new TupleTypeInfo<>(input.getType(),
BasicTypeInfo.INT_TYPE_INFO);
+ DataStream<Tuple2<CdcRecord, Integer>> bucketAssigned =
+ partitionByKeyHash
+ .transform(
+ "dynamic-bucket-assigner",
+ rowWithBucketType,
+
GlobalIndexAssignerOperator.forCdcRecord(table))
+ .setParallelism(partitionByKeyHash.getParallelism());
+
+ // 3. shuffle by bucket
+
+ DataStream<Tuple2<CdcRecord, Integer>> partitionByBucket =
+ partition(bucketAssigned, new
CdcWithBucketChannelComputer(schema), parallelism);
+
+ // 4. writer and committer
+ return sinkFrom(partitionByBucket, initialCommitUser);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
new file mode 100644
index 000000000..2d3be0a12
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.flink.lookup.RocksDBStateFactory;
+import org.apache.paimon.flink.lookup.RocksDBValueState;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.IDMapping;
+import org.apache.paimon.utils.PositiveIntInt;
+import org.apache.paimon.utils.PositiveIntIntSerializer;
+import org.apache.paimon.utils.SerBiFunction;
+import org.apache.paimon.utils.SerializableFunction;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+
+/** Assign UPDATE_BEFORE and bucket for the input record, output record with
bucket. */
+public class GlobalIndexAssigner<T> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AbstractFileStoreTable table;
+ private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction;
+ private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
+ keyPartExtractorFunction;
+ private final SerBiFunction<T, BinaryRow, T> setPartition;
+ private final SerBiFunction<T, RowKind, T> setRowKind;
+
+ private transient int targetBucketRowNumber;
+ private transient int assignId;
+ private transient BiConsumer<T, Integer> collector;
+ private transient int numAssigners;
+ private transient PartitionKeyExtractor<T> extractor;
+ private transient PartitionKeyExtractor<T> keyPartExtractor;
+ private transient File path;
+ private transient RocksDBStateFactory stateFactory;
+ private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex;
+
+ private transient IDMapping<BinaryRow> partMapping;
+ private transient BucketAssigner bucketAssigner;
+ private transient ExistsAction existsAction;
+
+ public GlobalIndexAssigner(
+ Table table,
+ SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction,
+ SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
keyPartExtractorFunction,
+ SerBiFunction<T, BinaryRow, T> setPartition,
+ SerBiFunction<T, RowKind, T> setRowKind) {
+ this.table = (AbstractFileStoreTable) table;
+ this.extractorFunction = extractorFunction;
+ this.keyPartExtractorFunction = keyPartExtractorFunction;
+ this.setPartition = setPartition;
+ this.setRowKind = setRowKind;
+ }
+
+ public void open(File tmpDir, int numAssigners, int assignId,
BiConsumer<T, Integer> collector)
+ throws Exception {
+ this.numAssigners = numAssigners;
+ this.assignId = assignId;
+ this.collector = collector;
+
+ CoreOptions coreOptions = table.coreOptions();
+ this.targetBucketRowNumber = (int)
coreOptions.dynamicBucketTargetRowNum();
+ this.extractor = extractorFunction.apply(table.schema());
+ this.keyPartExtractor = keyPartExtractorFunction.apply(table.schema());
+
+ // state
+ Options options = coreOptions.toConfiguration();
+ this.path = new File(tmpDir, "lookup-" + UUID.randomUUID());
+ this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+ long cacheSize =
options.get(CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE).getBytes();
+ RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
+ this.keyIndex =
+ stateFactory.valueState(
+ "keyIndex",
+ new RowCompactedSerializer(keyType),
+ new PositiveIntIntSerializer(),
+ cacheSize);
+
+ this.partMapping = new IDMapping<>(BinaryRow::copy);
+ this.bucketAssigner = new BucketAssigner();
+ this.existsAction = fromMergeEngine(coreOptions.mergeEngine());
+ }
+
+ public void process(T value) throws Exception {
+ BinaryRow partition = extractor.partition(value);
+ BinaryRow key = extractor.trimmedPrimaryKey(value);
+
+ int partId = partMapping.index(partition);
+
+ PositiveIntInt partitionBucket = keyIndex.get(key);
+ if (partitionBucket != null) {
+ int previousPartId = partitionBucket.i1();
+ int previousBucket = partitionBucket.i2();
+ if (previousPartId == partId) {
+ collect(value, previousBucket);
+ } else {
+ switch (existsAction) {
+ case DELETE:
+ {
+ // retract old record
+ BinaryRow previousPart =
partMapping.get(previousPartId);
+ T retract = setPartition.apply(value,
previousPart);
+ retract = setRowKind.apply(retract,
RowKind.DELETE);
+ collect(retract, previousBucket);
+ bucketAssigner.decrement(previousPart,
previousBucket);
+
+ // new record
+ processNewRecord(partition, partId, key, value);
+ break;
+ }
+ case USE_OLD:
+ {
+ BinaryRow previousPart =
partMapping.get(previousPartId);
+ T newValue = setPartition.apply(value,
previousPart);
+ collect(newValue, previousBucket);
+ break;
+ }
+ case SKIP_NEW:
+ // do nothing
+ break;
+ }
+ }
+ } else {
+ // new record
+ processNewRecord(partition, partId, key, value);
+ }
+ }
+
+ public void bootstrap(T value) throws IOException {
+ BinaryRow partition = keyPartExtractor.partition(value);
+ keyIndex.put(
+ keyPartExtractor.trimmedPrimaryKey(value),
+ new PositiveIntInt(partMapping.index(partition),
assignBucket(partition)));
+ }
+
+ private void processNewRecord(BinaryRow partition, int partId, BinaryRow
key, T value)
+ throws IOException {
+ int bucket = assignBucket(partition);
+ keyIndex.put(key, new PositiveIntInt(partId, bucket));
+ collect(value, bucket);
+ }
+
+ private int assignBucket(BinaryRow partition) {
+ return bucketAssigner.assignBucket(partition, this::isAssignBucket,
targetBucketRowNumber);
+ }
+
+ private boolean isAssignBucket(int bucket) {
+ return computeAssignId(bucket) == assignId;
+ }
+
+ private int computeAssignId(int hash) {
+ return Math.abs(hash % numAssigners);
+ }
+
+ private void collect(T value, int bucket) {
+ collector.accept(value, bucket);
+ }
+
+ public void close() throws IOException {
+ if (stateFactory != null) {
+ stateFactory.close();
+ stateFactory = null;
+ }
+
+ if (path != null) {
+ FileIOUtils.deleteDirectoryQuietly(path);
+ }
+ }
+
+ private static class BucketAssigner {
+
+ private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new
HashMap<>();
+
+ public int assignBucket(BinaryRow part, Filter<Integer> filter, int
maxCount) {
+ TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+ for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
+ int bucket = entry.getKey();
+ int count = entry.getValue();
+ if (filter.test(bucket) && count < maxCount) {
+ bucketMap.put(bucket, count + 1);
+ return bucket;
+ }
+ }
+
+ for (int i = 0; ; i++) {
+ if (filter.test(i) && !bucketMap.containsKey(i)) {
+ bucketMap.put(i, 1);
+ return i;
+ }
+ }
+ }
+
+ public void decrement(BinaryRow part, int bucket) {
+ bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
+ }
+
+ private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
+ TreeMap<Integer, Integer> map = stats.get(part);
+ if (map == null) {
+ map = new TreeMap<>();
+ stats.put(part.copy(), map);
+ }
+ return map;
+ }
+ }
+
+ private ExistsAction fromMergeEngine(MergeEngine mergeEngine) {
+ switch (mergeEngine) {
+ case DEDUPLICATE:
+ return ExistsAction.DELETE;
+ case PARTIAL_UPDATE:
+ case AGGREGATE:
+ return ExistsAction.USE_OLD;
+ case FIRST_ROW:
+ return ExistsAction.SKIP_NEW;
+ default:
+ throw new UnsupportedOperationException("Unsupported engine: "
+ mergeEngine);
+ }
+ }
+
+ private enum ExistsAction {
+ DELETE,
+ USE_OLD,
+ SKIP_NEW
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
new file mode 100644
index 000000000..5d5acd203
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcRecordPartitionKeyExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
+import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
+
+/** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
+public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple2<T, Integer>>
+ implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T,
Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final GlobalIndexAssigner<T> assigner;
+
+ public GlobalIndexAssignerOperator(GlobalIndexAssigner<T> assigner) {
+ this.assigner = assigner;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ File[] tmpDirs =
+
getContainingTask().getEnvironment().getIOManager().getSpillingDirectories();
+ File tmpDir =
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
+ assigner.open(
+ tmpDir,
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ this::collect);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<KeyPartOrRow, T>>
streamRecord)
+ throws Exception {
+ Tuple2<KeyPartOrRow, T> tuple2 = streamRecord.getValue();
+ T value = tuple2.f1;
+ switch (tuple2.f0) {
+ case KEY_PART:
+ assigner.bootstrap(value);
+ break;
+ case ROW:
+ assigner.process(value);
+ break;
+ }
+ }
+
+ private void collect(T value, int bucket) {
+ output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.assigner.close();
+ }
+
+ public static GlobalIndexAssignerOperator<RowData> forRowData(Table table)
{
+ return new GlobalIndexAssignerOperator<>(createRowDataAssigner(table));
+ }
+
+ public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
+ return new GlobalIndexAssigner<>(
+ t,
+ RowDataPartitionKeyExtractor::new,
+ KeyPartPartitionKeyExtractor::new,
+ new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()),
+ (rowData, rowKind) -> {
+ rowData.setRowKind(toFlinkRowKind(rowKind));
+ return rowData;
+ });
+ }
+
+ public static GlobalIndexAssignerOperator<CdcRecord> forCdcRecord(Table
table) {
+ RowType partitionType = ((FileStoreTable)
table).schema().logicalPartitionType();
+ List<String> partitionNames = partitionType.getFieldNames();
+ RowDataToObjectArrayConverter converter = new
RowDataToObjectArrayConverter(partitionType);
+ GlobalIndexAssigner<CdcRecord> assigner =
+ new GlobalIndexAssigner<>(
+ table,
+ CdcRecordPartitionKeyExtractor::new,
+ CdcRecordPartitionKeyExtractor::new,
+ (record, part) -> {
+ CdcRecord partCdc =
+ CdcRecordUtils.fromGenericRow(
+
GenericRow.of(converter.convert(part)), partitionNames);
+ Map<String, String> fields = new
HashMap<>(record.fields());
+ fields.putAll(partCdc.fields());
+ return new CdcRecord(record.kind(), fields);
+ },
+ CdcRecord::setRowKind);
+ return new GlobalIndexAssignerOperator<>(assigner);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
new file mode 100644
index 000000000..2df812007
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.AbstractInnerTableScan;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Bootstrap key index from Paimon table. */
+public class IndexBootstrap implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Table table;
+
+ public IndexBootstrap(Table table) {
+ this.table = table;
+ }
+
+ public void bootstrap(int numAssigners, int assignId,
Consumer<InternalRow> collector)
+ throws IOException {
+ RowType rowType = table.rowType();
+ List<String> fieldNames = rowType.getFieldNames();
+ List<String> keyPartFields =
+ Stream.concat(table.primaryKeys().stream(),
table.partitionKeys().stream())
+ .collect(Collectors.toList());
+ int[] projection =
+ keyPartFields.stream()
+ .map(fieldNames::indexOf)
+ .mapToInt(Integer::intValue)
+ .toArray();
+ ReadBuilder readBuilder =
table.newReadBuilder().withProjection(projection);
+
+ AbstractInnerTableScan tableScan = (AbstractInnerTableScan)
readBuilder.newScan();
+ TableScan.Plan plan =
+ tableScan.withBucketFilter(bucket -> bucket % numAssigners ==
assignId).plan();
+
+ try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan)) {
+ reader.forEachRemaining(collector);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
new file mode 100644
index 000000000..d5f0683a7
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/** Operator for {@link IndexBootstrap}. */
+public class IndexBootstrapOperator<T> extends
AbstractStreamOperator<Tuple2<KeyPartOrRow, T>>
+ implements OneInputStreamOperator<T, Tuple2<KeyPartOrRow, T>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IndexBootstrap bootstrap;
+ private final SerializableFunction<InternalRow, T> converter;
+
+ public IndexBootstrapOperator(
+ IndexBootstrap bootstrap, SerializableFunction<InternalRow, T>
converter) {
+ this.bootstrap = bootstrap;
+ this.converter = converter;
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ bootstrap.bootstrap(
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ this::collect);
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> streamRecord) throws Exception {
+ output.collect(new StreamRecord<>(new Tuple2<>(KeyPartOrRow.ROW,
streamRecord.getValue())));
+ }
+
+ private void collect(InternalRow row) {
+ output.collect(
+ new StreamRecord<>(new Tuple2<>(KeyPartOrRow.KEY_PART,
converter.apply(row))));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartOrRow.java
similarity index 51%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartOrRow.java
index 648143d92..10c80c8d3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartOrRow.java
@@ -16,29 +16,33 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
-
-import java.io.Serializable;
-
-/**
- * A utility class to compute which downstream channel a given record should
be sent to.
- *
- * @param <T> type of record
- */
-public interface ChannelComputer<T> extends Serializable {
-
- void setup(int numChannels);
-
- int channel(T record);
-
- static int select(BinaryRow partition, int bucket, int numChannels) {
- int startChannel = Math.abs(partition.hashCode()) % numChannels;
- return (startChannel + bucket) % numChannels;
+package org.apache.paimon.flink.sink.index;
+
+/** Type of record, key or full row. */
+public enum KeyPartOrRow {
+ KEY_PART,
+ ROW;
+
+ public byte toByteValue() {
+ switch (this) {
+ case KEY_PART:
+ return 0;
+ case ROW:
+ return 1;
+ default:
+ throw new UnsupportedOperationException("Unsupported value: "
+ this);
+ }
}
- static int select(int bucket, int numChannels) {
- return bucket % numChannels;
+ public static KeyPartOrRow fromByteValue(byte value) {
+ switch (value) {
+ case 0:
+ return KEY_PART;
+ case 1:
+ return ROW;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported byte value '" + value + "' for row
kind.");
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
new file mode 100644
index 000000000..6dc3dca42
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** A {@link PartitionKeyExtractor} to {@link RowData} with only key and
partiton fields. */
+public class KeyPartPartitionKeyExtractor implements
PartitionKeyExtractor<RowData> {
+
+ private final Projection partitionProjection;
+ private final Projection keyProjection;
+
+ public KeyPartPartitionKeyExtractor(TableSchema schema) {
+ List<String> primaryKeys = schema.primaryKeys();
+ List<String> partitionKeys = schema.partitionKeys();
+ RowType keyPartType =
+ schema.projectedLogicalRowType(
+ Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
+ .collect(Collectors.toList()));
+ this.partitionProjection = CodeGenUtils.newProjection(keyPartType,
partitionKeys);
+ this.keyProjection = CodeGenUtils.newProjection(keyPartType,
primaryKeys);
+ }
+
+ @Override
+ public BinaryRow partition(RowData record) {
+ return partitionProjection.apply(new FlinkRowWrapper(record));
+ }
+
+ @Override
+ public BinaryRow trimmedPrimaryKey(RowData record) {
+ return keyProjection.apply(new FlinkRowWrapper(record));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
new file mode 100644
index 000000000..9535b3826
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** A {@link ChannelComputer} for KeyPartOrRow and row. */
+public class KeyPartRowChannelComputer implements
ChannelComputer<Tuple2<KeyPartOrRow, RowData>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RowType rowType;
+ private final RowType keyPartType;
+ private final List<String> primaryKey;
+
+ private transient int numChannels;
+ private transient Projection rowProject;
+ private transient Projection keyPartProject;
+
+ public KeyPartRowChannelComputer(
+ RowType rowType, RowType keyPartType, List<String> primaryKey) {
+
+ this.rowType = rowType;
+ this.keyPartType = keyPartType;
+ this.primaryKey = primaryKey;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.rowProject = CodeGenUtils.newProjection(rowType, primaryKey);
+ this.keyPartProject = CodeGenUtils.newProjection(keyPartType,
primaryKey);
+ }
+
+ @Override
+ public int channel(Tuple2<KeyPartOrRow, RowData> record) {
+ BinaryRow key =
+ (record.f0 == KeyPartOrRow.KEY_PART ? keyPartProject :
rowProject)
+ .apply(new FlinkRowWrapper(record.f1));
+ return Math.abs(key.hashCode() % numChannels);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyWithRowSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyWithRowSerializer.java
new file mode 100644
index 000000000..876aa296c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyWithRowSerializer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.flink.utils.InternalTypeSerializer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.paimon.flink.sink.index.KeyPartOrRow.KEY_PART;
+
+/** A {@link InternalTypeSerializer} to serialize KeyPartOrRow with T. */
+public class KeyWithRowSerializer<T> extends
InternalTypeSerializer<Tuple2<KeyPartOrRow, T>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<T> keyPartSerializer;
+ private final TypeSerializer<T> rowSerializer;
+
+ public KeyWithRowSerializer(
+ TypeSerializer<T> keyPartSerializer, TypeSerializer<T>
rowSerializer) {
+ this.keyPartSerializer = keyPartSerializer;
+ this.rowSerializer = rowSerializer;
+ }
+
+ @Override
+ public TypeSerializer<Tuple2<KeyPartOrRow, T>> duplicate() {
+ return new KeyWithRowSerializer<>(keyPartSerializer.duplicate(),
rowSerializer.duplicate());
+ }
+
+ @Override
+ public Tuple2<KeyPartOrRow, T> createInstance() {
+ return new Tuple2<>();
+ }
+
+ private TypeSerializer<T> serializer(KeyPartOrRow keyPartOrRow) {
+ return keyPartOrRow == KEY_PART ? keyPartSerializer : rowSerializer;
+ }
+
+ @Override
+ public Tuple2<KeyPartOrRow, T> copy(Tuple2<KeyPartOrRow, T> from) {
+ return new Tuple2<>(from.f0, serializer(from.f0).copy(from.f1));
+ }
+
+ @Override
+ public void serialize(Tuple2<KeyPartOrRow, T> record, DataOutputView
target)
+ throws IOException {
+ target.writeByte(record.f0.toByteValue());
+ serializer(record.f0).serialize(record.f1, target);
+ }
+
+ @Override
+ public Tuple2<KeyPartOrRow, T> deserialize(DataInputView source) throws
IOException {
+ KeyPartOrRow keyPartOrRow =
KeyPartOrRow.fromByteValue(source.readByte());
+ T row = serializer(keyPartOrRow).deserialize(source);
+ return new Tuple2<>(keyPartOrRow, row);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KeyWithRowSerializer<?> that = (KeyWithRowSerializer<?>) o;
+ return Objects.equals(keyPartSerializer, that.keyPartSerializer)
+ && Objects.equals(rowSerializer, that.rowSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keyPartSerializer, rowSerializer);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
new file mode 100644
index 000000000..f0b0d0863
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/** A {@link TypeInformation} for internal serializer. */
+public class InternalTypeInfo<T> extends TypeInformation<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final InternalTypeSerializer<T> serializer;
+
+ public InternalTypeInfo(InternalTypeSerializer<T> serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<T> getTypeClass() {
+ return (Class<T>) serializer.createInstance().getClass();
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ return serializer.duplicate();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof InternalTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InternalTypeInfo<?> that = (InternalTypeInfo<?>) o;
+ return Objects.equals(serializer, that.serializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(serializer);
+ }
+
+ @Override
+ public String toString() {
+ return "InternalTypeInfo{" + "serializer=" + serializer + '}';
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeSerializer.java
new file mode 100644
index 000000000..33952e6f3
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/** A simple version {@link TypeSerializer}. */
+public abstract class InternalTypeSerializer<T> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ serialize(deserialize(source), target);
+ }
+
+ @Override
+ public TypeSerializerSnapshot<T> snapshotConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
new file mode 100644
index 000000000..c70d9633c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SerBiFunction;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** Project {@link BinaryRow} fields into {@link RowData}. */
+public class ProjectToRowDataFunction implements SerBiFunction<RowData,
BinaryRow, RowData> {
+
+ private final RowData.FieldGetter[] fieldGetters;
+
+ private final Map<Integer, Integer> projectMapping;
+ private final RowData.FieldGetter[] projectGetters;
+
+ public ProjectToRowDataFunction(RowType rowType, List<String>
projectFields) {
+ LogicalType[] types =
+ LogicalTypeConversion.toLogicalType(rowType)
+ .getChildren()
+ .toArray(new LogicalType[0]);
+ this.fieldGetters =
+ IntStream.range(0, types.length)
+ .mapToObj(i -> createFieldGetter(types[i], i))
+ .toArray(RowData.FieldGetter[]::new);
+
+ List<String> fieldNames = rowType.getFieldNames();
+ this.projectMapping =
+ projectFields.stream()
+ .collect(Collectors.toMap(fieldNames::indexOf,
projectFields::indexOf));
+ this.projectGetters =
+ projectFields.stream()
+ .map(
+ field ->
+ createFieldGetter(
+
types[rowType.getFieldIndex(field)],
+ projectFields.indexOf(field)))
+ .toArray(RowData.FieldGetter[]::new);
+ }
+
+ @Override
+ public RowData apply(RowData input, BinaryRow project) {
+ GenericRowData newRow = new GenericRowData(fieldGetters.length);
+ FlinkRowData partRow = new FlinkRowData(project);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ Object field =
+ projectMapping.containsKey(i)
+ ?
projectGetters[projectMapping.get(i)].getFieldOrNull(partRow)
+ : fieldGetters[i].getFieldOrNull(input);
+ newRow.setField(i, field);
+ }
+ return newRow;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
new file mode 100644
index 000000000..65c608ba4
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for batch file store. */
+public class GlobalDynamicBucketTableITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T ("
+ + "pt INT, "
+ + "pk INT, "
+ + "v INT, "
+ + "PRIMARY KEY (pk) NOT ENFORCED"
+ + ") PARTITIONED BY (pt) WITH ("
+ + " 'bucket'='-1', "
+ + " 'dynamic-bucket.target-row-num'='3' "
+ + ")");
+ }
+
+ @Test
+ public void testWriteRead() {
+ sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4),
(1, 5, 5)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 1, 1),
+ Row.of(1, 2, 2),
+ Row.of(1, 3, 3),
+ Row.of(1, 4, 4),
+ Row.of(1, 5, 5));
+ sql("INSERT INTO T VALUES (1, 3, 33), (1, 1, 11)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 1, 11),
+ Row.of(1, 2, 2),
+ Row.of(1, 3, 33),
+ Row.of(1, 4, 4),
+ Row.of(1, 5, 5));
+
+ assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+ .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
+
+ // change partition
+ sql("INSERT INTO T VALUES (2, 1, 2), (2, 2, 3)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(2, 1, 2),
+ Row.of(2, 2, 3),
+ Row.of(1, 3, 33),
+ Row.of(1, 4, 4),
+ Row.of(1, 5, 5));
+ }
+
+ @Test
+ public void testWriteWithAssignerParallelism() {
+ sql(
+ "INSERT INTO T /*+
OPTIONS('dynamic-bucket.assigner-parallelism'='3') */ "
+ + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4),
(1, 5, 5)");
+ assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+ .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index a16447364..81c9d1495 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -67,16 +67,22 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
@Test
@Timeout(120)
public void testRandomCdcEvents() throws Exception {
- innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1);
+ innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1,
false);
}
@Test
@Timeout(120)
public void testRandomCdcEventsDynamicBucket() throws Exception {
- innerTestRandomCdcEvents(-1);
+ innerTestRandomCdcEvents(-1, false);
}
- private void innerTestRandomCdcEvents(int numBucket) throws Exception {
+ @Test
+ @Timeout(120)
+ public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
+ innerTestRandomCdcEvents(-1, true);
+ }
+
+ private void innerTestRandomCdcEvents(int numBucket, boolean globalIndex)
throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numEvents = random.nextInt(1500) + 1;
@@ -118,7 +124,7 @@ public class FlinkCdcSyncTableSinkITCase extends
AbstractTestBase {
fileIO,
testTable.initialRowType(),
Collections.singletonList("pt"),
- Arrays.asList("pt", "k"),
+ globalIndex ? Collections.singletonList("k") :
Arrays.asList("pt", "k"),
numBucket);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
new file mode 100644
index 000000000..17b120282
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GlobalIndexAssigner}. */
+public class GlobalIndexAssignerTest extends TableTestBase {
+
+ private GlobalIndexAssigner<RowData> createAssigner(MergeEngine
mergeEngine) throws Exception {
+ Identifier identifier = identifier("T");
+ Options options = new Options();
+ options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
+ if (mergeEngine == MergeEngine.FIRST_ROW) {
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.LOOKUP);
+ }
+ options.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 3L);
+ options.set(CoreOptions.BUCKET, -1);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk")
+ .options(options.toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ return
GlobalIndexAssignerOperator.createRowDataAssigner(catalog.getTable(identifier));
+ }
+
+ @Test
+ public void testBucketAssign() throws Exception {
+ GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE);
+ List<Integer> output = new ArrayList<>();
+ assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) ->
output.add(bucket));
+
+ // assign
+ assigner.process(GenericRowData.of(1, 1, 1));
+ assigner.process(GenericRowData.of(1, 2, 2));
+ assigner.process(GenericRowData.of(1, 3, 3));
+ assertThat(output).containsExactly(0, 0, 0);
+ output.clear();
+
+ // full
+ assigner.process(GenericRowData.of(1, 4, 4));
+ assertThat(output).containsExactly(2);
+ output.clear();
+
+ // another partition
+ assigner.process(GenericRowData.of(2, 5, 5));
+ assertThat(output).containsExactly(0);
+ output.clear();
+
+ // read assigned
+ assigner.process(GenericRowData.of(1, 4, 4));
+ assigner.process(GenericRowData.of(1, 2, 2));
+ assigner.process(GenericRowData.of(1, 3, 3));
+ assertThat(output).containsExactly(2, 0, 0);
+ output.clear();
+
+ assigner.close();
+ }
+
+ @Test
+ public void testUpsert() throws Exception {
+ GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE);
+ List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+ assigner.open(
+ new File(warehouse.getPath()),
+ 2,
+ 0,
+ (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+
+ // change partition
+ assigner.process(GenericRowData.of(1, 1, 1));
+ assigner.process(GenericRowData.of(2, 1, 2));
+ assertThat(output)
+ .containsExactly(
+ new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
+ new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1,
1, 2), 0),
+ new Tuple2<>(GenericRowData.of(2, 1, 2), 0));
+ output.clear();
+
+ // test partition 1 deleted
+ assigner.process(GenericRowData.of(1, 2, 2));
+ assigner.process(GenericRowData.of(1, 3, 3));
+ assigner.process(GenericRowData.of(1, 4, 4));
+ assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+ output.clear();
+
+ // move from full bucket
+ assigner.process(GenericRowData.of(2, 4, 4));
+ assertThat(output)
+ .containsExactly(
+ new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1,
4, 4), 0),
+ new Tuple2<>(GenericRowData.of(2, 4, 4), 0));
+ output.clear();
+
+ // test partition 1 deleted
+ assigner.process(GenericRowData.of(1, 5, 5));
+ assertThat(output.stream().map(t -> t.f1)).containsExactly(0);
+ output.clear();
+
+ assigner.close();
+ }
+
+ @Test
+ public void testUseOldPartition() throws Exception {
+ MergeEngine mergeEngine =
+ ThreadLocalRandom.current().nextBoolean()
+ ? MergeEngine.PARTIAL_UPDATE
+ : MergeEngine.AGGREGATE;
+ GlobalIndexAssigner<RowData> assigner = createAssigner(mergeEngine);
+ List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+ assigner.open(
+ new File(warehouse.getPath()),
+ 2,
+ 0,
+ (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+
+ // change partition
+ assigner.process(GenericRowData.of(1, 1, 1));
+ assigner.process(GenericRowData.of(2, 1, 2));
+ assertThat(output)
+ .containsExactly(
+ new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
+ new Tuple2<>(GenericRowData.of(1, 1, 2), 0));
+ output.clear();
+
+ // test partition 2 no effect
+ assigner.process(GenericRowData.of(2, 2, 2));
+ assigner.process(GenericRowData.of(2, 3, 3));
+ assigner.process(GenericRowData.of(2, 4, 4));
+ assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+ output.clear();
+ }
+
+ @Test
+ public void testFirstRow() throws Exception {
+ GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.FIRST_ROW);
+ List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+ assigner.open(
+ new File(warehouse.getPath()),
+ 2,
+ 0,
+ (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+
+ // change partition
+ assigner.process(GenericRowData.of(1, 1, 1));
+ assigner.process(GenericRowData.of(2, 1, 2));
+ assertThat(output).containsExactly(new Tuple2<>(GenericRowData.of(1,
1, 1), 0));
+ output.clear();
+
+ // test partition 2 no effect
+ assigner.process(GenericRowData.of(2, 2, 2));
+ assigner.process(GenericRowData.of(2, 3, 3));
+ assigner.process(GenericRowData.of(2, 4, 4));
+ assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+ output.clear();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
new file mode 100644
index 000000000..cd185d1de
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IndexBootstrap}. */
+public class IndexBootstrapTest extends TableTestBase {
+
+ @Test
+ public void test() throws Exception {
+ Identifier identifier = identifier("T");
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 5);
+ Schema schema =
+ Schema.newBuilder()
+ .column("col", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .primaryKey("pk")
+ .options(options.toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+
+ write(
+ table,
+ GenericRow.of(1, 1),
+ GenericRow.of(2, 2),
+ GenericRow.of(3, 3),
+ GenericRow.of(4, 4),
+ GenericRow.of(5, 5),
+ GenericRow.of(6, 6),
+ GenericRow.of(7, 7));
+
+ IndexBootstrap indexBootstrap = new IndexBootstrap(table);
+ List<Integer> result = new ArrayList<>();
+ Consumer<InternalRow> consumer = row -> result.add(row.getInt(0));
+
+ indexBootstrap.bootstrap(2, 0, consumer);
+ assertThat(result).containsExactlyInAnyOrder(2, 3);
+ result.clear();
+
+ indexBootstrap.bootstrap(2, 1, consumer);
+ assertThat(result).containsExactlyInAnyOrder(1, 4, 5, 6, 7);
+ result.clear();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 5841570a0..00e30e598 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -382,22 +382,6 @@ public class SparkReadITCase extends SparkReadTestBase {
.contains("[k1,v1]", "[k2,v2]");
}
- @Test
- public void testCreateTableWithInvalidPk() {
- assertThatThrownBy(
- () ->
- spark.sql(
- "CREATE TABLE PartitionedPkTable (\n"
- + "a BIGINT,\n"
- + "b STRING,\n"
- + "c DOUBLE)\n"
- + "PARTITIONED BY (b)\n"
- + "TBLPROPERTIES
('primary-key' = 'a')"))
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining(
- "Primary key constraint [a] should include all
partition fields [b]");
- }
-
@Test
public void testCreateTableWithNonexistentPk() {
spark.sql("USE paimon");