This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 29fb04ab6 [core] Supports cross partition update (#1719)
29fb04ab6 is described below

commit 29fb04ab6e46d7466506f9ba1982c7401de1d282
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 7 14:43:22 2023 +0800

    [core] Supports cross partition update (#1719)
---
 docs/content/concepts/primary-key-table.md         |  47 +++-
 .../java/org/apache/paimon/utils/IDMapping.java    |  56 +++++
 .../org/apache/paimon/utils/PositiveIntInt.java    |  52 ++--
 .../paimon/utils/PositiveIntIntSerializer.java     |  50 ++++
 .../utils/RowDataToObjectArrayConverter.java       |   5 +-
 .../org/apache/paimon/utils/SerBiFunction.java     |  28 +--
 .../paimon/utils/PositiveIntIntSerializerTest.java |  53 +++++
 .../java/org/apache/paimon/KeyValueFileStore.java  |  11 +-
 .../org/apache/paimon/codegen/CodeGenUtils.java    |   6 +
 .../paimon/operation/AbstractFileStoreScan.java    |  24 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   2 +-
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +
 .../paimon/operation/KeyValueFileStoreScan.java    |   2 +-
 .../main/java/org/apache/paimon/schema/Schema.java |   7 +-
 .../org/apache/paimon/schema/SchemaValidation.java |   8 +
 .../java/org/apache/paimon/schema/TableSchema.java |  15 +-
 .../paimon/table/AbstractFileStoreTable.java       |   1 +
 .../java/org/apache/paimon/table/BucketMode.java   |   7 +
 .../table/ChangelogValueCountFileStoreTable.java   |   1 +
 .../table/ChangelogWithKeyFileStoreTable.java      |   1 +
 .../table/source/AbstractInnerTableScan.java       |   6 +
 .../table/source/snapshot/SnapshotReader.java      |   2 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |   6 +
 .../apache/paimon/table/system/AuditLogTable.java  |   6 +
 .../test/java/org/apache/paimon/TestFileStore.java |   1 +
 .../org/apache/paimon/schema/TableSchemaTest.java  |   9 +-
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |   2 +-
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |   2 +-
 .../paimon/flink/lookup/RocksDBListState.java      |  20 +-
 .../paimon/flink/lookup/RocksDBSetState.java       |  17 +-
 .../apache/paimon/flink/lookup/RocksDBState.java   |  13 +-
 .../paimon/flink/lookup/RocksDBStateFactory.java   |  25 +-
 .../paimon/flink/lookup/RocksDBValueState.java     |  17 +-
 .../flink/lookup/SecondaryIndexLookupTable.java    |   2 +-
 .../apache/paimon/flink/sink/ChannelComputer.java  |  16 ++
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  11 +-
 .../apache/paimon/flink/sink/cdc/CdcRecord.java    |   8 +-
 .../CdcRecordPartitionKeyExtractor.java}           |  34 +--
 .../paimon/flink/sink/cdc/CdcRecordUtils.java      |  13 +
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      |  10 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |  10 +-
 .../flink/sink/index/GlobalDynamicBucketSink.java  | 126 ++++++++++
 .../sink/index/GlobalDynamicCdcBucketSink.java     | 133 +++++++++++
 .../flink/sink/index/GlobalIndexAssigner.java      | 262 +++++++++++++++++++++
 .../sink/index/GlobalIndexAssignerOperator.java    | 133 +++++++++++
 .../paimon/flink/sink/index/IndexBootstrap.java    |  69 ++++++
 .../flink/sink/index/IndexBootstrapOperator.java   |  65 +++++
 .../KeyPartOrRow.java}                             |  48 ++--
 .../sink/index/KeyPartPartitionKeyExtractor.java   |  61 +++++
 .../sink/index/KeyPartRowChannelComputer.java      |  68 ++++++
 .../flink/sink/index/KeyWithRowSerializer.java     |  97 ++++++++
 .../paimon/flink/utils/InternalTypeInfo.java       | 100 ++++++++
 .../paimon/flink/utils/InternalTypeSerializer.java |  62 +++++
 .../flink/utils/ProjectToRowDataFunction.java      |  83 +++++++
 .../flink/GlobalDynamicBucketTableITCase.java      |  87 +++++++
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      |  14 +-
 .../flink/sink/index/GlobalIndexAssignerTest.java  | 198 ++++++++++++++++
 .../flink/sink/index/IndexBootstrapTest.java       |  79 +++++++
 .../org/apache/paimon/spark/SparkReadITCase.java   |  16 --
 59 files changed, 2090 insertions(+), 219 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 9218178be..ab4a9ed16 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -36,16 +36,41 @@ By [defining primary keys]({{< ref 
"how-to/creating-tables#tables-with-primary-k
 
 A bucket is the smallest storage unit for reads and writes, each bucket 
directory contains an [LSM tree]({{< ref "concepts/file-layouts#lsm-trees" >}}).
 
-Primary Key Table supports two bucket mode:
-1. Fixed Bucket mode: configure a bucket greater than 0, rescaling buckets can 
only be done through offline processes, 
-   see [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}). A too 
large number of buckets leads to too many
-   small files, and a too small number of buckets leads to poor write 
performance.
-2. Dynamic Bucket mode: configure `'bucket' = '-1'`, Paimon dynamically 
maintains the index, automatic expansion of
-   the number of buckets. (This is an experimental feature)
-   - Option1: `'dynamic-bucket.target-row-num'`: controls the target row 
number for one bucket.
-   - Option2: `'dynamic-bucket.assigner-parallelism'`: Parallelism of assigner 
operator, controls the number of initialized bucket.
-   - Dynamic Bucket requires more memory, 100 million entries in a partition 
takes up 1 GB more memory, partitions that are no longer active do not take up 
memory.
-   - Dynamic Bucket only support single write job. Please do not start 
multiple jobs to write to the same partition.
+### Fixed Bucket
+
+Configure a bucket greater than 0, rescaling buckets can only be done through 
offline processes,
+see [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}). A too large 
number of buckets leads to too many
+small files, and a too small number of buckets leads to poor write performance.
+
+### Dynamic Bucket
+
+{{< hint info >}}
+This is an experimental feature.
+{{< /hint >}}
+
+Configure `'bucket' = '-1'`, Paimon dynamically maintains the index, automatic 
expansion of the number of buckets.
+
+- Option1: `'dynamic-bucket.target-row-num'`: controls the target row number 
for one bucket.
+- Option2: `'dynamic-bucket.assigner-parallelism'`: Parallelism of assigner 
operator, controls the number of initialized bucket.
+
+{{< hint info >}}
+Dynamic Bucket only support single write job. Please do not start multiple 
jobs to write to the same partition.
+{{< /hint >}}
+
+**Normal Dynamic Bucket Mode**:
+
+When your updates do not cross partitions (no partitions, or primary keys 
contain all partition fields), Dynamic
+Bucket mode uses HASH index to maintain mapping from key to bucket, it 
requires more memory than fixed bucket mode,
+100 million entries in a partition takes up 1 GB more memory, partitions that 
are no longer active do not take up memory.
+
+**Cross Partitions Update Dynamic Bucket Mode**:
+
+When you need cross partition updates (primary keys not contain all partition 
fields), Dynamic Bucket mode directly
+maintains the mapping of keys to partition and bucket, uses local disks, and 
initializes indexes by reading all 
+existing keys in the table when starting stream write job. Different merge 
engines have different behaviors:
+1. Deduplicate: Delete data from the old partition and insert new data into 
the new partition.
+2. PartialUpdate & Aggregation: Insert new data into the old partition.
+3. FirstRow: Ignore new data if there is old value.
 
 ## Merge Engines
 
@@ -165,8 +190,6 @@ INSERT INTO T VALUES (1, null,null,1);
 SELECT * FROM T; -- output 1, 1, 0, 1
 ```
 
-
-
 ### Aggregation
 
 {{< hint info >}}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IDMapping.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IDMapping.java
new file mode 100644
index 000000000..4c5d1d7a8
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IDMapping.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** Incremental id generation. */
+public class IDMapping<T> {
+
+    private final Function<T, T> copy;
+
+    private final List<T> values = new ArrayList<>();
+    private final Map<T, Integer> indexMap = new HashMap<>();
+
+    private int nextIndex = 0;
+
+    public IDMapping(Function<T, T> copy) {
+        this.copy = copy;
+    }
+
+    public int index(T t) {
+        Integer index = indexMap.get(t);
+        if (index == null) {
+            index = nextIndex;
+            nextIndex++;
+            T copied = copy.apply(t);
+            indexMap.put(copied, index);
+            values.add(copied);
+        }
+        return index;
+    }
+
+    public T get(int index) {
+        return values.get(index);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
 b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntInt.java
similarity index 53%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntInt.java
index 363c5490e..1042bee3d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntInt.java
@@ -16,59 +16,55 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.annotation.Experimental;
-import org.apache.paimon.types.RowKind;
+package org.apache.paimon.utils;
 
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
 import java.util.Objects;
 
-/** A data change message from the CDC source. */
-@Experimental
-public class CdcRecord implements Serializable {
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-    private static final long serialVersionUID = 1L;
+/** IntInt pojo class. */
+public class PositiveIntInt implements Serializable {
 
-    private final RowKind kind;
-    private final Map<String, String> fields;
+    private static final long serialVersionUID = 1L;
 
-    public CdcRecord(RowKind kind, Map<String, String> fields) {
-        this.kind = kind;
-        this.fields = fields;
-    }
+    private final int i1;
+    private final int i2;
 
-    public static CdcRecord emptyRecord() {
-        return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
+    public PositiveIntInt(int i1, int i2) {
+        checkArgument(i1 >= 0);
+        checkArgument(i2 >= 0);
+        this.i1 = i1;
+        this.i2 = i2;
     }
 
-    public RowKind kind() {
-        return kind;
+    public int i1() {
+        return i1;
     }
 
-    public Map<String, String> fields() {
-        return fields;
+    public int i2() {
+        return i2;
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof CdcRecord)) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
-
-        CdcRecord that = (CdcRecord) o;
-        return Objects.equals(kind, that.kind) && Objects.equals(fields, 
that.fields);
+        PositiveIntInt intInt = (PositiveIntInt) o;
+        return i1 == intInt.i1 && i2 == intInt.i2;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(kind, fields);
+        return Objects.hash(i1, i2);
     }
 
     @Override
     public String toString() {
-        return kind.shortString() + " " + fields;
+        return "IntInt{" + "i1=" + i1 + ", i2=" + i2 + '}';
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntIntSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntIntSerializer.java
new file mode 100644
index 000000000..a6c59f273
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/PositiveIntIntSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.serializer.SerializerSingleton;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+
+import java.io.IOException;
+
+import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
+import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
+
+/** A {@link SerializerSingleton} for {@link PositiveIntInt}. */
+public class PositiveIntIntSerializer extends 
SerializerSingleton<PositiveIntInt> {
+
+    @Override
+    public PositiveIntInt copy(PositiveIntInt from) {
+        return from;
+    }
+
+    @Override
+    public void serialize(PositiveIntInt record, DataOutputView target) throws 
IOException {
+        encodeInt(target, record.i1());
+        encodeInt(target, record.i2());
+    }
+
+    @Override
+    public PositiveIntInt deserialize(DataInputView source) throws IOException 
{
+        int i1 = decodeInt(source);
+        int i2 = decodeInt(source);
+        return new PositiveIntInt(i1, i2);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index 6b710fcc7..502067566 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -24,12 +24,15 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.types.RowType;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.IntStream;
 
 /** Convert {@link InternalRow} to object array. */
-public class RowDataToObjectArrayConverter {
+public class RowDataToObjectArrayConverter implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     private final RowType rowType;
     private final InternalRow.FieldGetter[] fieldGetters;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
 b/paimon-common/src/main/java/org/apache/paimon/utils/SerBiFunction.java
similarity index 56%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/SerBiFunction.java
index 648143d92..e90465934 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/SerBiFunction.java
@@ -16,29 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
+package org.apache.paimon.utils;
 
 import java.io.Serializable;
+import java.util.function.BiFunction;
 
-/**
- * A utility class to compute which downstream channel a given record should 
be sent to.
- *
- * @param <T> type of record
- */
-public interface ChannelComputer<T> extends Serializable {
-
-    void setup(int numChannels);
-
-    int channel(T record);
-
-    static int select(BinaryRow partition, int bucket, int numChannels) {
-        int startChannel = Math.abs(partition.hashCode()) % numChannels;
-        return (startChannel + bucket) % numChannels;
-    }
-
-    static int select(int bucket, int numChannels) {
-        return bucket % numChannels;
-    }
-}
+/** A {@link BiFunction} that is also {@link Serializable}. */
+@FunctionalInterface
+public interface SerBiFunction<T, U, R> extends BiFunction<T, U, R>, 
Serializable {}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/PositiveIntIntSerializerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/PositiveIntIntSerializerTest.java
new file mode 100644
index 000000000..1d184634e
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/PositiveIntIntSerializerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.data.serializer.SerializerTestBase;
+
+import java.util.Random;
+
+/** Test for {@link PositiveIntIntSerializer}. */
+public class PositiveIntIntSerializerTest extends 
SerializerTestBase<PositiveIntInt> {
+
+    @Override
+    protected Serializer<PositiveIntInt> createSerializer() {
+        return new PositiveIntIntSerializer();
+    }
+
+    @Override
+    protected boolean deepEquals(PositiveIntInt t1, PositiveIntInt t2) {
+        return t1.equals(t2);
+    }
+
+    @Override
+    protected PositiveIntInt[] getTestData() {
+        Random rnd = new Random();
+        int rndInt = Math.abs(rnd.nextInt());
+
+        int[] ints = new int[] {0, 1, Integer.MAX_VALUE, rndInt};
+        PositiveIntInt[] intInts = new PositiveIntInt[ints.length];
+        for (int i = 0; i < ints.length; i++) {
+            intInts[i] =
+                    new PositiveIntInt(
+                            ints[rnd.nextInt(ints.length)], 
ints[rnd.nextInt(ints.length)]);
+        }
+        return intInts;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index ba1ef81db..a6b22e7c3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -50,12 +50,14 @@ import java.util.function.Supplier;
 import static org.apache.paimon.predicate.PredicateBuilder.and;
 import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link FileStore} for querying and updating {@link KeyValue}s. */
 public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
 
     private static final long serialVersionUID = 1L;
 
+    private final boolean crossPartitionUpdate;
     private final RowType bucketKeyType;
     private final RowType keyType;
     private final RowType valueType;
@@ -68,6 +70,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             FileIO fileIO,
             SchemaManager schemaManager,
             long schemaId,
+            boolean crossPartitionUpdate,
             CoreOptions options,
             RowType partitionType,
             RowType bucketKeyType,
@@ -76,6 +79,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             KeyValueFieldsExtractor keyValueFieldsExtractor,
             MergeFunctionFactory<KeyValue> mfFactory) {
         super(fileIO, schemaManager, schemaId, options, partitionType);
+        this.crossPartitionUpdate = crossPartitionUpdate;
         this.bucketKeyType = bucketKeyType;
         this.keyType = keyType;
         this.valueType = valueType;
@@ -87,7 +91,12 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public BucketMode bucketMode() {
-        return options.bucket() == -1 ? BucketMode.DYNAMIC : BucketMode.FIXED;
+        if (options.bucket() == -1) {
+            return crossPartitionUpdate ? BucketMode.GLOBAL_DYNAMIC : 
BucketMode.DYNAMIC;
+        } else {
+            checkArgument(!crossPartitionUpdate);
+            return BucketMode.FIXED;
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
index df9104a61..868734b0f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
@@ -29,6 +29,12 @@ public class CodeGenUtils {
 
     public static final Projection EMPTY_PROJECTION = input -> 
BinaryRow.EMPTY_ROW;
 
+    public static Projection newProjection(RowType inputType, List<String> 
fields) {
+        List<String> fieldNames = inputType.getFieldNames();
+        int[] mapping = 
fields.stream().mapToInt(fieldNames::indexOf).toArray();
+        return newProjection(inputType, mapping);
+    }
+
     public static Projection newProjection(RowType inputType, int[] mapping) {
         if (mapping.length == 0) {
             return EMPTY_PROJECTION;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 836e386ca..45f6bd153 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -66,11 +66,11 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final SchemaManager schemaManager;
-    protected final ScanBucketFilter bucketFilter;
+    protected final ScanBucketFilter bucketKeyFilter;
 
     private Predicate partitionFilter;
     private Snapshot specifiedSnapshot = null;
-    private Integer specifiedBucket = null;
+    private Filter<Integer> bucketFilter = null;
     private List<ManifestFileMeta> specifiedManifests = null;
     private ScanKind scanKind = ScanKind.ALL;
     private Filter<Integer> levelFilter = null;
@@ -80,7 +80,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     public AbstractFileStoreScan(
             RowType partitionType,
-            ScanBucketFilter bucketFilter,
+            ScanBucketFilter bucketKeyFilter,
             SnapshotManager snapshotManager,
             SchemaManager schemaManager,
             ManifestFile.Factory manifestFileFactory,
@@ -90,7 +90,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             Integer scanManifestParallelism) {
         this.partitionStatsConverter = new 
FieldStatsArraySerializer(partitionType);
         this.partitionConverter = new 
RowDataToObjectArrayConverter(partitionType);
-        this.bucketFilter = bucketFilter;
+        this.bucketKeyFilter = bucketKeyFilter;
         this.snapshotManager = snapshotManager;
         this.schemaManager = schemaManager;
         this.manifestFileFactory = manifestFileFactory;
@@ -123,7 +123,13 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan withBucket(int bucket) {
-        this.specifiedBucket = bucket;
+        this.bucketFilter = i -> i == bucket;
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withBucketFilter(Filter<Integer> bucketFilter) {
+        this.bucketFilter = bucketFilter;
         return this;
     }
 
@@ -312,12 +318,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     /** Note: Keep this thread-safe. */
     private boolean filterByBucket(ManifestEntry entry) {
-        return (specifiedBucket == null || entry.bucket() == specifiedBucket);
+        return (bucketFilter == null || bucketFilter.test(entry.bucket()));
     }
 
     /** Note: Keep this thread-safe. */
     private boolean filterByBucketSelector(ManifestEntry entry) {
-        return bucketFilter.select(entry.bucket(), entry.totalBuckets());
+        return bucketKeyFilter.select(entry.bucket(), entry.totalBuckets());
     }
 
     /** Note: Keep this thread-safe. */
@@ -349,8 +355,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 return false;
             }
 
-            if (specifiedBucket != null && numOfBuckets == 
totalBucketGetter.apply(row)) {
-                return specifiedBucket.intValue() == bucketGetter.apply(row);
+            if (bucketFilter != null && numOfBuckets == 
totalBucketGetter.apply(row)) {
+                return bucketFilter.test(bucketGetter.apply(row));
             }
 
             return true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 118c9f269..e0df67766 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -62,7 +62,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
 
     public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
         this.filter = predicate;
-        this.bucketFilter.pushdown(predicate);
+        this.bucketKeyFilter.pushdown(predicate);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 457d9a0bf..441b6eef7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -45,6 +45,8 @@ public interface FileStoreScan {
 
     FileStoreScan withBucket(int bucket);
 
+    FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);
+
     FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
 
     FileStoreScan withSnapshot(long snapshotId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index dc480074f..d953f9c06 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -65,7 +65,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
 
     public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
         this.keyFilter = predicate;
-        this.bucketFilter.pushdown(predicate);
+        this.bucketKeyFilter.pushdown(predicate);
         return this;
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index 85fee3d56..7f464b586 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -133,14 +133,9 @@ public class Schema {
                 "Table column %s should include all primary key constraint %s",
                 fieldNames,
                 primaryKeys);
-        Set<String> pkSet = new HashSet<>(primaryKeys);
-        Preconditions.checkState(
-                pkSet.containsAll(partitionKeys),
-                "Primary key constraint %s should include all partition fields 
%s",
-                primaryKeys,
-                partitionKeys);
 
         // primary key should not nullable
+        Set<String> pkSet = new HashSet<>(primaryKeys);
         List<DataField> newFields = new ArrayList<>();
         for (DataField field : fields) {
             if (pkSet.contains(field.name()) && field.type().isNullable()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 37f779d35..95f7689e7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -185,6 +185,14 @@ public class SchemaValidation {
                         "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
             }
         }
+
+        if (schema.crossPartitionUpdate() && options.bucket() != -1) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "You should use dynamic bucket (bucket = -1) mode 
in cross partition update case "
+                                    + "(Primary key constraint %s not include 
all partition fields %s).",
+                            schema.primaryKeys(), schema.partitionKeys()));
+        }
     }
 
     private static void validatePrimaryKeysType(List<DataField> fields, 
List<String> primaryKeys) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index f173f0f31..dd1d6bb6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -107,11 +107,6 @@ public class TableSchema implements Serializable {
 
     public List<String> trimmedPrimaryKeys() {
         if (primaryKeys.size() > 0) {
-            Preconditions.checkState(
-                    primaryKeys.containsAll(partitionKeys),
-                    String.format(
-                            "Primary key constraint %s should include all 
partition fields %s",
-                            primaryKeys, partitionKeys));
             List<String> adjusted =
                     primaryKeys.stream()
                             .filter(pk -> !partitionKeys.contains(pk))
@@ -145,6 +140,14 @@ public class TableSchema implements Serializable {
         return bucketKeys;
     }
 
+    public boolean crossPartitionUpdate() {
+        if (primaryKeys.isEmpty() || partitionKeys.isEmpty()) {
+            return false;
+        }
+
+        return !primaryKeys.containsAll(partitionKeys);
+    }
+
     /** Original bucket keys, maybe empty. */
     private List<String> originalBucketKeys() {
         String key = options.get(BUCKET_KEY.key());
@@ -215,7 +218,7 @@ public class TableSchema implements Serializable {
                 .collect(Collectors.toList());
     }
 
-    private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
+    public RowType projectedLogicalRowType(List<String> projectedFieldNames) {
         return new RowType(projectedDataFields(projectedFieldNames));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 597943b20..8309d0f80 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -110,6 +110,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
             case FIXED:
                 return new FixedBucketRowKeyExtractor(schema());
             case DYNAMIC:
+            case GLOBAL_DYNAMIC:
                 return new DynamicBucketRowKeyExtractor(schema());
             case UNAWARE:
                 return new UnawareBucketRowKeyExtractor(schema());
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java 
b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
index bd31644c0..02dbea24e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -39,6 +39,13 @@ public enum BucketMode {
      */
     DYNAMIC,
 
+    /**
+     * Compared with the DYNAMIC mode, this mode not only dynamically 
allocates buckets for
+     * Partition table, but also updates data across partitions. The primary 
key does not contain
+     * partition fields.
+     */
+    GLOBAL_DYNAMIC,
+
     /**
      * Ignoring buckets can be equivalent to understanding that all data 
enters the global bucket,
      * and data is randomly written to the table. The data in the bucket has 
no order relationship
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 035816e1c..bb7a9a9b4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -88,6 +88,7 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
                             fileIO,
                             schemaManager(),
                             tableSchema.id(),
+                            false,
                             new CoreOptions(tableSchema.options()),
                             tableSchema.logicalPartitionType(),
                             tableSchema.logicalBucketKeyType(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 624d9fc41..9e0157103 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -131,6 +131,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                             fileIO(),
                             schemaManager(),
                             tableSchema.id(),
+                            tableSchema.crossPartitionUpdate(),
                             options,
                             tableSchema.logicalPartitionType(),
                             
addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 9f119363d..034a333b9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -41,6 +41,7 @@ import 
org.apache.paimon.table.source.snapshot.StartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
 import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 
 import java.util.List;
@@ -66,6 +67,11 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
         return this;
     }
 
+    public AbstractInnerTableScan withBucketFilter(Filter<Integer> 
bucketFilter) {
+        snapshotReader.withBucketFilter(bucketFilter);
+        return this;
+    }
+
     public CoreOptions options() {
         return options;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index ae547ed97..0cee60086 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -55,6 +55,8 @@ public interface SnapshotReader {
 
     SnapshotReader withBucket(int bucket);
 
+    SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
+
     /** Get splits plan from snapshot. */
     Plan read();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 848c40175..99d2b7291 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -167,6 +167,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
+        scan.withBucketFilter(bucketFilter);
+        return this;
+    }
+
     /** Get splits from {@link FileKind#ADD} files. */
     @Override
     public Plan read() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index cc1f98f4b..2063859a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -234,6 +234,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
+            snapshotReader.withBucketFilter(bucketFilter);
+            return this;
+        }
+
         @Override
         public Plan read() {
             return snapshotReader.read();
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 2d8b98a60..9b5441fd5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -105,6 +105,7 @@ public class TestFileStore extends KeyValueFileStore {
                 FileIOFinder.find(new Path(root)),
                 new SchemaManager(FileIOFinder.find(new Path(root)), 
options.path()),
                 0L,
+                false,
                 options,
                 partitionType,
                 keyType,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 2d39a4c57..6171fdbb9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -47,12 +47,9 @@ public class TableSchemaTest {
         List<String> primaryKeys = Collections.singletonList("f1");
         Map<String, String> options = new HashMap<>();
 
-        assertThatThrownBy(
-                        () ->
-                                new TableSchema(
-                                        1, fields, 10, partitionKeys, 
primaryKeys, options, ""))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessage("Primary key constraint [f1] should include all 
partition fields [f0]");
+        TableSchema schema =
+                new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, "");
+        assertThat(schema.crossPartitionUpdate()).isTrue();
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index c413d8a3e..caa809062 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -35,7 +35,7 @@ import java.util.function.Predicate;
 /** A {@link LookupTable} for table without primary key. */
 public class NoPrimaryKeyLookupTable implements LookupTable {
 
-    private final RocksDBListState state;
+    private final RocksDBListState<InternalRow, InternalRow> state;
 
     private final Predicate<InternalRow> recordFilter;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 13e04089d..0928bf047 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -34,7 +34,7 @@ import java.util.function.Predicate;
 /** A {@link LookupTable} for primary key table. */
 public class PrimaryKeyLookupTable implements LookupTable {
 
-    protected final RocksDBValueState tableState;
+    protected final RocksDBValueState<InternalRow, InternalRow> tableState;
 
     protected final Predicate<InternalRow> recordFilter;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
index f2ee7ee43..72ea7e9a0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.lookup;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.Serializer;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -30,22 +29,20 @@ import java.util.Collections;
 import java.util.List;
 
 /** RocksDB state for key -> List of value. */
-public class RocksDBListState extends RocksDBState<List<InternalRow>> {
+public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> {
 
     private final ListDelimitedSerializer listSerializer = new 
ListDelimitedSerializer();
 
-    private static final List<InternalRow> EMPTY = Collections.emptyList();
-
     public RocksDBListState(
             RocksDB db,
             ColumnFamilyHandle columnFamily,
-            Serializer<InternalRow> keySerializer,
-            Serializer<InternalRow> valueSerializer,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
             long lruCacheSize) {
         super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
     }
 
-    public void add(InternalRow key, InternalRow value) throws IOException {
+    public void add(K key, V value) throws IOException {
         byte[] keyBytes = serializeKey(key);
         byte[] valueBytes = serializeValue(value);
         try {
@@ -56,7 +53,7 @@ public class RocksDBListState extends 
RocksDBState<List<InternalRow>> {
         cache.invalidate(wrap(keyBytes));
     }
 
-    public List<InternalRow> get(InternalRow key) throws IOException {
+    public List<V> get(K key) throws IOException {
         byte[] keyBytes = serializeKey(key);
         return cache.get(
                 wrap(keyBytes),
@@ -67,16 +64,15 @@ public class RocksDBListState extends 
RocksDBState<List<InternalRow>> {
                     } catch (RocksDBException e) {
                         throw new RuntimeException(e);
                     }
-                    List<InternalRow> rows =
-                            listSerializer.deserializeList(valueBytes, 
valueSerializer);
+                    List<V> rows = listSerializer.deserializeList(valueBytes, 
valueSerializer);
                     if (rows == null) {
-                        return EMPTY;
+                        return Collections.emptyList();
                     }
                     return rows;
                 });
     }
 
-    private byte[] serializeValue(InternalRow value) throws IOException {
+    private byte[] serializeValue(V value) throws IOException {
         valueOutputView.clear();
         valueSerializer.serialize(value, valueOutputView);
         return valueOutputView.getCopyOfBuffer();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
index feb7e667b..a27ff44a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.lookup;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.Serializer;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -34,20 +33,20 @@ import java.util.List;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Rocksdb state for key -> Set values. */
-public class RocksDBSetState extends RocksDBState<List<byte[]>> {
+public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> {
 
     private static final byte[] EMPTY = new byte[0];
 
     public RocksDBSetState(
             RocksDB db,
             ColumnFamilyHandle columnFamily,
-            Serializer<InternalRow> keySerializer,
-            Serializer<InternalRow> valueSerializer,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
             long lruCacheSize) {
         super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
     }
 
-    public List<InternalRow> get(InternalRow key) throws IOException {
+    public List<V> get(K key) throws IOException {
         ByteArray keyBytes = wrap(serializeKey(key));
         List<byte[]> valueBytes = cache.getIfPresent(keyBytes);
         if (valueBytes == null) {
@@ -67,7 +66,7 @@ public class RocksDBSetState extends 
RocksDBState<List<byte[]>> {
             cache.put(keyBytes, valueBytes);
         }
 
-        List<InternalRow> values = new ArrayList<>(valueBytes.size());
+        List<V> values = new ArrayList<>(valueBytes.size());
         for (byte[] value : valueBytes) {
             valueInputView.setBuffer(value);
             values.add(valueSerializer.deserialize(valueInputView));
@@ -75,7 +74,7 @@ public class RocksDBSetState extends 
RocksDBState<List<byte[]>> {
         return values;
     }
 
-    public void retract(InternalRow key, InternalRow value) throws IOException 
{
+    public void retract(K key, V value) throws IOException {
         try {
             byte[] bytes = invalidKeyAndGetKVBytes(key, value);
             if (db.get(columnFamily, bytes) != null) {
@@ -86,7 +85,7 @@ public class RocksDBSetState extends 
RocksDBState<List<byte[]>> {
         }
     }
 
-    public void add(InternalRow key, InternalRow value) throws IOException {
+    public void add(K key, V value) throws IOException {
         try {
             byte[] bytes = invalidKeyAndGetKVBytes(key, value);
             db.put(columnFamily, writeOptions, bytes, EMPTY);
@@ -95,7 +94,7 @@ public class RocksDBSetState extends 
RocksDBState<List<byte[]>> {
         }
     }
 
-    private byte[] invalidKeyAndGetKVBytes(InternalRow key, InternalRow value) 
throws IOException {
+    private byte[] invalidKeyAndGetKVBytes(K key, V value) throws IOException {
         checkArgument(value != null);
 
         keyOutView.clear();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
index 7269d1234..784b61301 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.lookup;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataOutputSerializer;
@@ -37,7 +36,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 /** Rocksdb state for key value. */
-public abstract class RocksDBState<CacheV> {
+public abstract class RocksDBState<K, V, CacheV> {
 
     protected final RocksDB db;
 
@@ -45,9 +44,9 @@ public abstract class RocksDBState<CacheV> {
 
     protected final ColumnFamilyHandle columnFamily;
 
-    protected final Serializer<InternalRow> keySerializer;
+    protected final Serializer<K> keySerializer;
 
-    protected final Serializer<InternalRow> valueSerializer;
+    protected final Serializer<V> valueSerializer;
 
     protected final DataOutputSerializer keyOutView;
 
@@ -60,8 +59,8 @@ public abstract class RocksDBState<CacheV> {
     public RocksDBState(
             RocksDB db,
             ColumnFamilyHandle columnFamily,
-            Serializer<InternalRow> keySerializer,
-            Serializer<InternalRow> valueSerializer,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
             long lruCacheSize) {
         this.db = db;
         this.columnFamily = columnFamily;
@@ -78,7 +77,7 @@ public abstract class RocksDBState<CacheV> {
                         .build();
     }
 
-    protected byte[] serializeKey(InternalRow key) throws IOException {
+    protected byte[] serializeKey(K key) throws IOException {
         keyOutView.clear();
         keySerializer.serialize(key, keyOutView);
         return keyOutView.getCopyOfBuffer();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index c32703af7..af6a7f201 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.lookup;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.flink.RocksDBOptions;
 
@@ -63,34 +62,34 @@ public class RocksDBStateFactory implements Closeable {
         }
     }
 
-    public RocksDBValueState valueState(
+    public <K, V> RocksDBValueState<K, V> valueState(
             String name,
-            Serializer<InternalRow> keySerializer,
-            Serializer<InternalRow> valueSerializer,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
             long lruCacheSize)
             throws IOException {
-        return new RocksDBValueState(
+        return new RocksDBValueState<>(
                 db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
     }
 
-    public RocksDBSetState setState(
+    public <K, V> RocksDBSetState<K, V> setState(
             String name,
-            Serializer<InternalRow> keySerializer,
-            Serializer<InternalRow> valueSerializer,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
             long lruCacheSize)
             throws IOException {
-        return new RocksDBSetState(
+        return new RocksDBSetState<>(
                 db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
     }
 
-    public RocksDBListState listState(
+    public <K, V> RocksDBListState<K, V> listState(
             String name,
-            Serializer<InternalRow> keySerializer,
-            Serializer<InternalRow> valueSerializer,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
             long lruCacheSize)
             throws IOException {
 
-        return new RocksDBListState(
+        return new RocksDBListState<>(
                 db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
index 624d5ba93..077906403 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.lookup;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.Serializer;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -32,19 +31,19 @@ import java.io.IOException;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Rocksdb state for key -> a single value. */
-public class RocksDBValueState extends RocksDBState<RocksDBState.Reference> {
+public class RocksDBValueState<K, V> extends RocksDBState<K, V, 
RocksDBState.Reference> {
 
     public RocksDBValueState(
             RocksDB db,
             ColumnFamilyHandle columnFamily,
-            Serializer<InternalRow> keySerializer,
-            Serializer<InternalRow> valueSerializer,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
             long lruCacheSize) {
         super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
     }
 
     @Nullable
-    public InternalRow get(InternalRow key) throws IOException {
+    public V get(K key) throws IOException {
         try {
             Reference valueRef = get(wrap(serializeKey(key)));
             return valueRef.isPresent() ? deserializeValue(valueRef.bytes) : 
null;
@@ -63,7 +62,7 @@ public class RocksDBValueState extends 
RocksDBState<RocksDBState.Reference> {
         return valueRef;
     }
 
-    public void put(InternalRow key, InternalRow value) throws IOException {
+    public void put(K key, V value) throws IOException {
         checkArgument(value != null);
 
         try {
@@ -76,7 +75,7 @@ public class RocksDBValueState extends 
RocksDBState<RocksDBState.Reference> {
         }
     }
 
-    public void delete(InternalRow key) throws IOException {
+    public void delete(K key) throws IOException {
         try {
             byte[] keyBytes = serializeKey(key);
             ByteArray keyByteArray = wrap(keyBytes);
@@ -89,12 +88,12 @@ public class RocksDBValueState extends 
RocksDBState<RocksDBState.Reference> {
         }
     }
 
-    private InternalRow deserializeValue(byte[] valueBytes) throws IOException 
{
+    private V deserializeValue(byte[] valueBytes) throws IOException {
         valueInputView.setBuffer(valueBytes);
         return valueSerializer.deserialize(valueInputView);
     }
 
-    private byte[] serializeValue(InternalRow value) throws IOException {
+    private byte[] serializeValue(V value) throws IOException {
         valueOutputView.clear();
         valueSerializer.serialize(value, valueOutputView);
         return valueOutputView.getCopyOfBuffer();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index 80aeb20ac..61452d681 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -34,7 +34,7 @@ import java.util.function.Predicate;
 /** A {@link LookupTable} for primary key table which provides lookup by 
secondary key. */
 public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
 
-    private final RocksDBSetState indexState;
+    private final RocksDBSetState<InternalRow, InternalRow> indexState;
 
     private final KeyProjectedRow secKeyRow;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
index 648143d92..4a10d4386 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.SerializableFunction;
 
 import java.io.Serializable;
 
@@ -41,4 +42,19 @@ public interface ChannelComputer<T> extends Serializable {
     static int select(int bucket, int numChannels) {
         return bucket % numChannels;
     }
+
+    static <T, R> ChannelComputer<R> transform(
+            ChannelComputer<T> input, SerializableFunction<R, T> converter) {
+        return new ChannelComputer<R>() {
+            @Override
+            public void setup(int numChannels) {
+                input.setup(numChannels);
+            }
+
+            @Override
+            public int channel(R record) {
+                return input.channel(converter.apply(record));
+            }
+        };
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 36c34aee8..2c23dfdf9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -73,7 +74,9 @@ public class FlinkSinkBuilder {
             case FIXED:
                 return buildForFixedBucket();
             case DYNAMIC:
-                return buildDynamicBucketSink();
+                return buildDynamicBucketSink(false);
+            case GLOBAL_DYNAMIC:
+                return buildDynamicBucketSink(true);
             case UNAWARE:
                 return buildUnawareBucketSink();
             default:
@@ -81,9 +84,11 @@ public class FlinkSinkBuilder {
         }
     }
 
-    private DataStreamSink<?> buildDynamicBucketSink() {
+    private DataStreamSink<?> buildDynamicBucketSink(boolean globalIndex) {
         checkArgument(logSinkFunction == null, "Dynamic bucket mode can not 
work with log system.");
-        return new RowDynamicBucketSink(table, 
overwritePartition).build(input, parallelism);
+        return globalIndex
+                ? new GlobalDynamicBucketSink(table, 
overwritePartition).build(input, parallelism)
+                : new RowDynamicBucketSink(table, 
overwritePartition).build(input, parallelism);
     }
 
     private DataStreamSink<?> buildForFixedBucket() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 363c5490e..61d4ec7a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -32,7 +32,8 @@ public class CdcRecord implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final RowKind kind;
+    private RowKind kind;
+
     private final Map<String, String> fields;
 
     public CdcRecord(RowKind kind, Map<String, String> fields) {
@@ -40,6 +41,11 @@ public class CdcRecord implements Serializable {
         this.fields = fields;
     }
 
+    public CdcRecord setRowKind(RowKind kind) {
+        this.kind = kind;
+        return this;
+    }
+
     public static CdcRecord emptyRecord() {
         return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
similarity index 50%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
index 648143d92..c4ceba307 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
@@ -16,29 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink;
+package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
 
-import java.io.Serializable;
+/** A {@link PartitionKeyExtractor} to {@link InternalRow}. */
+public class CdcRecordPartitionKeyExtractor implements 
PartitionKeyExtractor<CdcRecord> {
 
-/**
- * A utility class to compute which downstream channel a given record should 
be sent to.
- *
- * @param <T> type of record
- */
-public interface ChannelComputer<T> extends Serializable {
+    private final CdcRecordKeyAndBucketExtractor extractor;
 
-    void setup(int numChannels);
-
-    int channel(T record);
+    public CdcRecordPartitionKeyExtractor(TableSchema schema) {
+        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+    }
 
-    static int select(BinaryRow partition, int bucket, int numChannels) {
-        int startChannel = Math.abs(partition.hashCode()) % numChannels;
-        return (startChannel + bucket) % numChannels;
+    @Override
+    public BinaryRow partition(CdcRecord record) {
+        extractor.setRecord(record);
+        return extractor.partition();
     }
 
-    static int select(int bucket, int numChannels) {
-        return bucket % numChannels;
+    @Override
+    public BinaryRow trimmedPrimaryKey(CdcRecord record) {
+        extractor.setRecord(record);
+        return extractor.trimmedPrimaryKey();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index a55b788a4..d0c422c08 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -27,6 +27,7 @@ import org.apache.paimon.utils.TypeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -113,4 +114,16 @@ public class CdcRecordUtils {
         }
         return Optional.of(genericRow);
     }
+
+    public static CdcRecord fromGenericRow(GenericRow row, List<String> 
fieldNames) {
+        Map<String, String> fields = new HashMap<>();
+        for (int i = 0; i < row.getFieldCount(); i++) {
+            Object field = row.getField(i);
+            if (field != null) {
+                fields.put(fieldNames.get(i), field.toString());
+            }
+        }
+
+        return new CdcRecord(row.getRowKind(), fields);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 5fd0f80d3..4f1745406 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.cdc;
 import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
@@ -117,17 +118,16 @@ public class CdcSinkBuilder<T> {
             case FIXED:
                 return buildForFixedBucket(parsed);
             case DYNAMIC:
-                return buildForDynamicBucket(parsed);
+                return new CdcDynamicBucketSink((FileStoreTable) 
table).build(parsed, parallelism);
+            case GLOBAL_DYNAMIC:
+                return new GlobalDynamicCdcBucketSink((FileStoreTable) table)
+                        .build(parsed, parallelism);
             case UNAWARE:
             default:
                 throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
         }
     }
 
-    private DataStreamSink<?> buildForDynamicBucket(DataStream<CdcRecord> 
parsed) {
-        return new CdcDynamicBucketSink((FileStoreTable) table).build(parsed, 
parallelism);
-    }
-
     private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord> 
parsed) {
         FileStoreTable dataTable = (FileStoreTable) table;
         DataStream<CdcRecord> partitioned =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 0070f3ae9..c5b716e45 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
@@ -152,10 +153,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         sink.sinkFrom(new DataStream<>(input.getExecutionEnvironment(), 
partitioned));
     }
 
-    private void buildForDynamicBucket(FileStoreTable table, 
DataStream<CdcRecord> parsed) {
-        new CdcDynamicBucketSink(table).build(parsed, parallelism);
-    }
-
     private void buildForFixedBucket(FileStoreTable table, 
DataStream<CdcRecord> parsed) {
         DataStream<CdcRecord> partitioned =
                 partition(parsed, new 
CdcRecordChannelComputer(table.schema()), parallelism);
@@ -196,7 +193,10 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                     buildForFixedBucket(table, parsedForTable);
                     break;
                 case DYNAMIC:
-                    buildForDynamicBucket(table, parsedForTable);
+                    new CdcDynamicBucketSink(table).build(parsedForTable, 
parallelism);
+                    break;
+                case GLOBAL_DYNAMIC:
+                    new 
GlobalDynamicCdcBucketSink(table).build(parsedForTable, parallelism);
                     break;
                 case UNAWARE:
                 default:
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
new file mode 100644
index 000000000..5f98f3b05
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator;
+import org.apache.paimon.flink.sink.FlinkWriteSink;
+import org.apache.paimon.flink.sink.RowWithBucketChannelComputer;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+
+/** Sink for global dynamic bucket table. */
+public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData, 
Integer>> {
+
+    private static final long serialVersionUID = 1L;
+
+    public GlobalDynamicBucketSink(
+            FileStoreTable table, @Nullable Map<String, String> 
overwritePartition) {
+        super(table, overwritePartition);
+    }
+
+    @Override
+    protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable> 
createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
+        return new DynamicBucketRowWriteOperator(table, writeProvider, 
commitUser);
+    }
+
+    public DataStreamSink<?> build(DataStream<RowData> input, @Nullable 
Integer parallelism) {
+        String initialCommitUser = UUID.randomUUID().toString();
+
+        TableSchema schema = table.schema();
+        RowType rowType = schema.logicalRowType();
+        List<String> primaryKeys = schema.primaryKeys();
+        List<String> partitionKeys = schema.partitionKeys();
+        RowDataSerializer rowSerializer = new 
RowDataSerializer(toLogicalType(rowType));
+        RowType keyPartType =
+                schema.projectedLogicalRowType(
+                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
+                                .collect(Collectors.toList()));
+        RowDataSerializer keyPartSerializer = new 
RowDataSerializer(toLogicalType(keyPartType));
+
+        // Topology:
+        // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- 
shuffle by bucket -->
+        // writer --> committer
+
+        DataStream<Tuple2<KeyPartOrRow, RowData>> bootstraped =
+                input.transform(
+                                "INDEX_BOOTSTRAP",
+                                new InternalTypeInfo<>(
+                                        new KeyWithRowSerializer<>(
+                                                keyPartSerializer, 
rowSerializer)),
+                                new IndexBootstrapOperator<>(
+                                        new IndexBootstrap(table), 
FlinkRowData::new))
+                        .setParallelism(input.getParallelism());
+
+        // 1. shuffle by key hash
+        Integer assignerParallelism = 
table.coreOptions().dynamicBucketAssignerParallelism();
+        if (assignerParallelism == null) {
+            assignerParallelism = parallelism;
+        }
+
+        KeyPartRowChannelComputer channelComputer =
+                new KeyPartRowChannelComputer(rowType, keyPartType, 
primaryKeys);
+        DataStream<Tuple2<KeyPartOrRow, RowData>> partitionByKeyHash =
+                partition(bootstraped, channelComputer, assignerParallelism);
+
+        // 2. bucket-assigner
+        TupleTypeInfo<Tuple2<RowData, Integer>> rowWithBucketType =
+                new TupleTypeInfo<>(input.getType(), 
BasicTypeInfo.INT_TYPE_INFO);
+        DataStream<Tuple2<RowData, Integer>> bucketAssigned =
+                partitionByKeyHash
+                        .transform(
+                                "dynamic-bucket-assigner",
+                                rowWithBucketType,
+                                GlobalIndexAssignerOperator.forRowData(table))
+                        .setParallelism(partitionByKeyHash.getParallelism());
+
+        // 3. shuffle by bucket
+
+        DataStream<Tuple2<RowData, Integer>> partitionByBucket =
+                partition(bucketAssigned, new 
RowWithBucketChannelComputer(schema), parallelism);
+
+        // 4. writer and committer
+        return sinkFrom(partitionByBucket, initialCommitUser);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
new file mode 100644
index 000000000..06bb5a147
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.FlinkWriteSink;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketWriteOperator;
+import org.apache.paimon.flink.sink.cdc.CdcHashKeyChannelComputer;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
+import org.apache.paimon.flink.sink.cdc.CdcWithBucketChannelComputer;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+
+/** Sink for global dynamic bucket table and {@link CdcRecord} inputs. */
+public class GlobalDynamicCdcBucketSink extends 
FlinkWriteSink<Tuple2<CdcRecord, Integer>> {
+
+    private static final long serialVersionUID = 1L;
+
+    public GlobalDynamicCdcBucketSink(FileStoreTable table) {
+        super(table, null);
+    }
+
+    @Override
+    protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> 
createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
+        return new CdcDynamicBucketWriteOperator(table, writeProvider, 
commitUser);
+    }
+
+    public DataStreamSink<?> build(DataStream<CdcRecord> input, @Nullable 
Integer parallelism) {
+        String initialCommitUser = UUID.randomUUID().toString();
+
+        TableSchema schema = table.schema();
+        List<String> primaryKeys = schema.primaryKeys();
+        List<String> partitionKeys = schema.partitionKeys();
+        RowType keyPartType =
+                schema.projectedLogicalRowType(
+                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
+                                .collect(Collectors.toList()));
+        List<String> keyPartNames = keyPartType.getFieldNames();
+        RowDataToObjectArrayConverter keyPartConverter =
+                new RowDataToObjectArrayConverter(keyPartType);
+
+        // Topology:
+        // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- 
shuffle by bucket -->
+        // writer --> committer
+
+        TupleTypeInfo<Tuple2<KeyPartOrRow, CdcRecord>> tuple2TupleType =
+                new TupleTypeInfo<>(new EnumTypeInfo<>(KeyPartOrRow.class), 
input.getType());
+        DataStream<Tuple2<KeyPartOrRow, CdcRecord>> bootstraped =
+                input.transform(
+                                "INDEX_BOOTSTRAP",
+                                tuple2TupleType,
+                                new IndexBootstrapOperator<>(
+                                        new IndexBootstrap(table),
+                                        row ->
+                                                CdcRecordUtils.fromGenericRow(
+                                                        GenericRow.ofKind(
+                                                                
row.getRowKind(),
+                                                                
keyPartConverter.convert(row)),
+                                                        keyPartNames)))
+                        .setParallelism(input.getParallelism());
+
+        // 1. shuffle by key hash
+        Integer assignerParallelism = 
table.coreOptions().dynamicBucketAssignerParallelism();
+        if (assignerParallelism == null) {
+            assignerParallelism = parallelism;
+        }
+
+        ChannelComputer<Tuple2<KeyPartOrRow, CdcRecord>> channelComputer =
+                ChannelComputer.transform(
+                        new CdcHashKeyChannelComputer(schema), tuple2 -> 
tuple2.f1);
+        DataStream<Tuple2<KeyPartOrRow, CdcRecord>> partitionByKeyHash =
+                partition(bootstraped, channelComputer, assignerParallelism);
+
+        // 2. bucket-assigner
+        TupleTypeInfo<Tuple2<CdcRecord, Integer>> rowWithBucketType =
+                new TupleTypeInfo<>(input.getType(), 
BasicTypeInfo.INT_TYPE_INFO);
+        DataStream<Tuple2<CdcRecord, Integer>> bucketAssigned =
+                partitionByKeyHash
+                        .transform(
+                                "dynamic-bucket-assigner",
+                                rowWithBucketType,
+                                
GlobalIndexAssignerOperator.forCdcRecord(table))
+                        .setParallelism(partitionByKeyHash.getParallelism());
+
+        // 3. shuffle by bucket
+
+        DataStream<Tuple2<CdcRecord, Integer>> partitionByBucket =
+                partition(bucketAssigned, new 
CdcWithBucketChannelComputer(schema), parallelism);
+
+        // 4. writer and committer
+        return sinkFrom(partitionByBucket, initialCommitUser);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
new file mode 100644
index 000000000..2d3be0a12
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.flink.lookup.RocksDBStateFactory;
+import org.apache.paimon.flink.lookup.RocksDBValueState;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.IDMapping;
+import org.apache.paimon.utils.PositiveIntInt;
+import org.apache.paimon.utils.PositiveIntIntSerializer;
+import org.apache.paimon.utils.SerBiFunction;
+import org.apache.paimon.utils.SerializableFunction;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+
+/** Assign UPDATE_BEFORE and bucket for the input record, output record with 
bucket. */
+public class GlobalIndexAssigner<T> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final AbstractFileStoreTable table;
+    private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction;
+    private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
+            keyPartExtractorFunction;
+    private final SerBiFunction<T, BinaryRow, T> setPartition;
+    private final SerBiFunction<T, RowKind, T> setRowKind;
+
+    private transient int targetBucketRowNumber;
+    private transient int assignId;
+    private transient BiConsumer<T, Integer> collector;
+    private transient int numAssigners;
+    private transient PartitionKeyExtractor<T> extractor;
+    private transient PartitionKeyExtractor<T> keyPartExtractor;
+    private transient File path;
+    private transient RocksDBStateFactory stateFactory;
+    private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex;
+
+    private transient IDMapping<BinaryRow> partMapping;
+    private transient BucketAssigner bucketAssigner;
+    private transient ExistsAction existsAction;
+
+    public GlobalIndexAssigner(
+            Table table,
+            SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction,
+            SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
keyPartExtractorFunction,
+            SerBiFunction<T, BinaryRow, T> setPartition,
+            SerBiFunction<T, RowKind, T> setRowKind) {
+        this.table = (AbstractFileStoreTable) table;
+        this.extractorFunction = extractorFunction;
+        this.keyPartExtractorFunction = keyPartExtractorFunction;
+        this.setPartition = setPartition;
+        this.setRowKind = setRowKind;
+    }
+
+    public void open(File tmpDir, int numAssigners, int assignId, 
BiConsumer<T, Integer> collector)
+            throws Exception {
+        this.numAssigners = numAssigners;
+        this.assignId = assignId;
+        this.collector = collector;
+
+        CoreOptions coreOptions = table.coreOptions();
+        this.targetBucketRowNumber = (int) 
coreOptions.dynamicBucketTargetRowNum();
+        this.extractor = extractorFunction.apply(table.schema());
+        this.keyPartExtractor = keyPartExtractorFunction.apply(table.schema());
+
+        // state
+        Options options = coreOptions.toConfiguration();
+        this.path = new File(tmpDir, "lookup-" + UUID.randomUUID());
+        this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+        long cacheSize = 
options.get(CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE).getBytes();
+        RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
+        this.keyIndex =
+                stateFactory.valueState(
+                        "keyIndex",
+                        new RowCompactedSerializer(keyType),
+                        new PositiveIntIntSerializer(),
+                        cacheSize);
+
+        this.partMapping = new IDMapping<>(BinaryRow::copy);
+        this.bucketAssigner = new BucketAssigner();
+        this.existsAction = fromMergeEngine(coreOptions.mergeEngine());
+    }
+
+    public void process(T value) throws Exception {
+        BinaryRow partition = extractor.partition(value);
+        BinaryRow key = extractor.trimmedPrimaryKey(value);
+
+        int partId = partMapping.index(partition);
+
+        PositiveIntInt partitionBucket = keyIndex.get(key);
+        if (partitionBucket != null) {
+            int previousPartId = partitionBucket.i1();
+            int previousBucket = partitionBucket.i2();
+            if (previousPartId == partId) {
+                collect(value, previousBucket);
+            } else {
+                switch (existsAction) {
+                    case DELETE:
+                        {
+                            // retract old record
+                            BinaryRow previousPart = 
partMapping.get(previousPartId);
+                            T retract = setPartition.apply(value, 
previousPart);
+                            retract = setRowKind.apply(retract, 
RowKind.DELETE);
+                            collect(retract, previousBucket);
+                            bucketAssigner.decrement(previousPart, 
previousBucket);
+
+                            // new record
+                            processNewRecord(partition, partId, key, value);
+                            break;
+                        }
+                    case USE_OLD:
+                        {
+                            BinaryRow previousPart = 
partMapping.get(previousPartId);
+                            T newValue = setPartition.apply(value, 
previousPart);
+                            collect(newValue, previousBucket);
+                            break;
+                        }
+                    case SKIP_NEW:
+                        // do nothing
+                        break;
+                }
+            }
+        } else {
+            // new record
+            processNewRecord(partition, partId, key, value);
+        }
+    }
+
+    public void bootstrap(T value) throws IOException {
+        BinaryRow partition = keyPartExtractor.partition(value);
+        keyIndex.put(
+                keyPartExtractor.trimmedPrimaryKey(value),
+                new PositiveIntInt(partMapping.index(partition), 
assignBucket(partition)));
+    }
+
+    private void processNewRecord(BinaryRow partition, int partId, BinaryRow 
key, T value)
+            throws IOException {
+        int bucket = assignBucket(partition);
+        keyIndex.put(key, new PositiveIntInt(partId, bucket));
+        collect(value, bucket);
+    }
+
+    private int assignBucket(BinaryRow partition) {
+        return bucketAssigner.assignBucket(partition, this::isAssignBucket, 
targetBucketRowNumber);
+    }
+
+    private boolean isAssignBucket(int bucket) {
+        return computeAssignId(bucket) == assignId;
+    }
+
+    private int computeAssignId(int hash) {
+        return Math.abs(hash % numAssigners);
+    }
+
+    private void collect(T value, int bucket) {
+        collector.accept(value, bucket);
+    }
+
+    public void close() throws IOException {
+        if (stateFactory != null) {
+            stateFactory.close();
+            stateFactory = null;
+        }
+
+        if (path != null) {
+            FileIOUtils.deleteDirectoryQuietly(path);
+        }
+    }
+
+    private static class BucketAssigner {
+
+        private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new 
HashMap<>();
+
+        public int assignBucket(BinaryRow part, Filter<Integer> filter, int 
maxCount) {
+            TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+            for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
+                int bucket = entry.getKey();
+                int count = entry.getValue();
+                if (filter.test(bucket) && count < maxCount) {
+                    bucketMap.put(bucket, count + 1);
+                    return bucket;
+                }
+            }
+
+            for (int i = 0; ; i++) {
+                if (filter.test(i) && !bucketMap.containsKey(i)) {
+                    bucketMap.put(i, 1);
+                    return i;
+                }
+            }
+        }
+
+        public void decrement(BinaryRow part, int bucket) {
+            bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
+        }
+
+        private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
+            TreeMap<Integer, Integer> map = stats.get(part);
+            if (map == null) {
+                map = new TreeMap<>();
+                stats.put(part.copy(), map);
+            }
+            return map;
+        }
+    }
+
+    private ExistsAction fromMergeEngine(MergeEngine mergeEngine) {
+        switch (mergeEngine) {
+            case DEDUPLICATE:
+                return ExistsAction.DELETE;
+            case PARTIAL_UPDATE:
+            case AGGREGATE:
+                return ExistsAction.USE_OLD;
+            case FIRST_ROW:
+                return ExistsAction.SKIP_NEW;
+            default:
+                throw new UnsupportedOperationException("Unsupported engine: " 
+ mergeEngine);
+        }
+    }
+
+    private enum ExistsAction {
+        DELETE,
+        USE_OLD,
+        SKIP_NEW
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
new file mode 100644
index 000000000..5d5acd203
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcRecordPartitionKeyExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
+import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
+
+/** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
+public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2<T, Integer>>
+        implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, 
Integer>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final GlobalIndexAssigner<T> assigner;
+
+    public GlobalIndexAssignerOperator(GlobalIndexAssigner<T> assigner) {
+        this.assigner = assigner;
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        File[] tmpDirs =
+                
getContainingTask().getEnvironment().getIOManager().getSpillingDirectories();
+        File tmpDir = 
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
+        assigner.open(
+                tmpDir,
+                getRuntimeContext().getNumberOfParallelSubtasks(),
+                getRuntimeContext().getIndexOfThisSubtask(),
+                this::collect);
+    }
+
+    @Override
+    public void processElement(StreamRecord<Tuple2<KeyPartOrRow, T>> 
streamRecord)
+            throws Exception {
+        Tuple2<KeyPartOrRow, T> tuple2 = streamRecord.getValue();
+        T value = tuple2.f1;
+        switch (tuple2.f0) {
+            case KEY_PART:
+                assigner.bootstrap(value);
+                break;
+            case ROW:
+                assigner.process(value);
+                break;
+        }
+    }
+
+    private void collect(T value, int bucket) {
+        output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.assigner.close();
+    }
+
+    public static GlobalIndexAssignerOperator<RowData> forRowData(Table table) 
{
+        return new GlobalIndexAssignerOperator<>(createRowDataAssigner(table));
+    }
+
+    public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
+        return new GlobalIndexAssigner<>(
+                t,
+                RowDataPartitionKeyExtractor::new,
+                KeyPartPartitionKeyExtractor::new,
+                new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()),
+                (rowData, rowKind) -> {
+                    rowData.setRowKind(toFlinkRowKind(rowKind));
+                    return rowData;
+                });
+    }
+
+    public static GlobalIndexAssignerOperator<CdcRecord> forCdcRecord(Table 
table) {
+        RowType partitionType = ((FileStoreTable) 
table).schema().logicalPartitionType();
+        List<String> partitionNames = partitionType.getFieldNames();
+        RowDataToObjectArrayConverter converter = new 
RowDataToObjectArrayConverter(partitionType);
+        GlobalIndexAssigner<CdcRecord> assigner =
+                new GlobalIndexAssigner<>(
+                        table,
+                        CdcRecordPartitionKeyExtractor::new,
+                        CdcRecordPartitionKeyExtractor::new,
+                        (record, part) -> {
+                            CdcRecord partCdc =
+                                    CdcRecordUtils.fromGenericRow(
+                                            
GenericRow.of(converter.convert(part)), partitionNames);
+                            Map<String, String> fields = new 
HashMap<>(record.fields());
+                            fields.putAll(partCdc.fields());
+                            return new CdcRecord(record.kind(), fields);
+                        },
+                        CdcRecord::setRowKind);
+        return new GlobalIndexAssignerOperator<>(assigner);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
new file mode 100644
index 000000000..2df812007
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.AbstractInnerTableScan;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Bootstrap key index from Paimon table. */
+public class IndexBootstrap implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Table table;
+
+    public IndexBootstrap(Table table) {
+        this.table = table;
+    }
+
+    public void bootstrap(int numAssigners, int assignId, 
Consumer<InternalRow> collector)
+            throws IOException {
+        RowType rowType = table.rowType();
+        List<String> fieldNames = rowType.getFieldNames();
+        List<String> keyPartFields =
+                Stream.concat(table.primaryKeys().stream(), 
table.partitionKeys().stream())
+                        .collect(Collectors.toList());
+        int[] projection =
+                keyPartFields.stream()
+                        .map(fieldNames::indexOf)
+                        .mapToInt(Integer::intValue)
+                        .toArray();
+        ReadBuilder readBuilder = 
table.newReadBuilder().withProjection(projection);
+
+        AbstractInnerTableScan tableScan = (AbstractInnerTableScan) 
readBuilder.newScan();
+        TableScan.Plan plan =
+                tableScan.withBucketFilter(bucket -> bucket % numAssigners == 
assignId).plan();
+
+        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)) {
+            reader.forEachRemaining(collector);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
new file mode 100644
index 000000000..d5f0683a7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/** Operator for {@link IndexBootstrap}. */
+public class IndexBootstrapOperator<T> extends 
AbstractStreamOperator<Tuple2<KeyPartOrRow, T>>
+        implements OneInputStreamOperator<T, Tuple2<KeyPartOrRow, T>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IndexBootstrap bootstrap;
+    private final SerializableFunction<InternalRow, T> converter;
+
+    public IndexBootstrapOperator(
+            IndexBootstrap bootstrap, SerializableFunction<InternalRow, T> 
converter) {
+        this.bootstrap = bootstrap;
+        this.converter = converter;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        bootstrap.bootstrap(
+                getRuntimeContext().getNumberOfParallelSubtasks(),
+                getRuntimeContext().getIndexOfThisSubtask(),
+                this::collect);
+    }
+
+    @Override
+    public void processElement(StreamRecord<T> streamRecord) throws Exception {
+        output.collect(new StreamRecord<>(new Tuple2<>(KeyPartOrRow.ROW, 
streamRecord.getValue())));
+    }
+
+    private void collect(InternalRow row) {
+        output.collect(
+                new StreamRecord<>(new Tuple2<>(KeyPartOrRow.KEY_PART, 
converter.apply(row))));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartOrRow.java
similarity index 51%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartOrRow.java
index 648143d92..10c80c8d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartOrRow.java
@@ -16,29 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.BinaryRow;
-
-import java.io.Serializable;
-
-/**
- * A utility class to compute which downstream channel a given record should 
be sent to.
- *
- * @param <T> type of record
- */
-public interface ChannelComputer<T> extends Serializable {
-
-    void setup(int numChannels);
-
-    int channel(T record);
-
-    static int select(BinaryRow partition, int bucket, int numChannels) {
-        int startChannel = Math.abs(partition.hashCode()) % numChannels;
-        return (startChannel + bucket) % numChannels;
+package org.apache.paimon.flink.sink.index;
+
+/** Type of record, key or full row. */
+public enum KeyPartOrRow {
+    KEY_PART,
+    ROW;
+
+    public byte toByteValue() {
+        switch (this) {
+            case KEY_PART:
+                return 0;
+            case ROW:
+                return 1;
+            default:
+                throw new UnsupportedOperationException("Unsupported value: " 
+ this);
+        }
     }
 
-    static int select(int bucket, int numChannels) {
-        return bucket % numChannels;
+    public static KeyPartOrRow fromByteValue(byte value) {
+        switch (value) {
+            case 0:
+                return KEY_PART;
+            case 1:
+                return ROW;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported byte value '" + value + "' for row 
kind.");
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
new file mode 100644
index 000000000..6dc3dca42
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** A {@link PartitionKeyExtractor} to {@link RowData} with only key and 
partiton fields. */
+public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<RowData> {
+
+    private final Projection partitionProjection;
+    private final Projection keyProjection;
+
+    public KeyPartPartitionKeyExtractor(TableSchema schema) {
+        List<String> primaryKeys = schema.primaryKeys();
+        List<String> partitionKeys = schema.partitionKeys();
+        RowType keyPartType =
+                schema.projectedLogicalRowType(
+                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
+                                .collect(Collectors.toList()));
+        this.partitionProjection = CodeGenUtils.newProjection(keyPartType, 
partitionKeys);
+        this.keyProjection = CodeGenUtils.newProjection(keyPartType, 
primaryKeys);
+    }
+
+    @Override
+    public BinaryRow partition(RowData record) {
+        return partitionProjection.apply(new FlinkRowWrapper(record));
+    }
+
+    @Override
+    public BinaryRow trimmedPrimaryKey(RowData record) {
+        return keyProjection.apply(new FlinkRowWrapper(record));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
new file mode 100644
index 000000000..9535b3826
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.sink.ChannelComputer;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** A {@link ChannelComputer} for KeyPartOrRow and row. */
+public class KeyPartRowChannelComputer implements 
ChannelComputer<Tuple2<KeyPartOrRow, RowData>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowType rowType;
+    private final RowType keyPartType;
+    private final List<String> primaryKey;
+
+    private transient int numChannels;
+    private transient Projection rowProject;
+    private transient Projection keyPartProject;
+
+    public KeyPartRowChannelComputer(
+            RowType rowType, RowType keyPartType, List<String> primaryKey) {
+
+        this.rowType = rowType;
+        this.keyPartType = keyPartType;
+        this.primaryKey = primaryKey;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.rowProject = CodeGenUtils.newProjection(rowType, primaryKey);
+        this.keyPartProject = CodeGenUtils.newProjection(keyPartType, 
primaryKey);
+    }
+
+    @Override
+    public int channel(Tuple2<KeyPartOrRow, RowData> record) {
+        BinaryRow key =
+                (record.f0 == KeyPartOrRow.KEY_PART ? keyPartProject : 
rowProject)
+                        .apply(new FlinkRowWrapper(record.f1));
+        return Math.abs(key.hashCode() % numChannels);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyWithRowSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyWithRowSerializer.java
new file mode 100644
index 000000000..876aa296c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyWithRowSerializer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.flink.utils.InternalTypeSerializer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.paimon.flink.sink.index.KeyPartOrRow.KEY_PART;
+
+/** A {@link InternalTypeSerializer} to serialize KeyPartOrRow with T. */
+public class KeyWithRowSerializer<T> extends 
InternalTypeSerializer<Tuple2<KeyPartOrRow, T>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TypeSerializer<T> keyPartSerializer;
+    private final TypeSerializer<T> rowSerializer;
+
+    public KeyWithRowSerializer(
+            TypeSerializer<T> keyPartSerializer, TypeSerializer<T> 
rowSerializer) {
+        this.keyPartSerializer = keyPartSerializer;
+        this.rowSerializer = rowSerializer;
+    }
+
+    @Override
+    public TypeSerializer<Tuple2<KeyPartOrRow, T>> duplicate() {
+        return new KeyWithRowSerializer<>(keyPartSerializer.duplicate(), 
rowSerializer.duplicate());
+    }
+
+    @Override
+    public Tuple2<KeyPartOrRow, T> createInstance() {
+        return new Tuple2<>();
+    }
+
+    private TypeSerializer<T> serializer(KeyPartOrRow keyPartOrRow) {
+        return keyPartOrRow == KEY_PART ? keyPartSerializer : rowSerializer;
+    }
+
+    @Override
+    public Tuple2<KeyPartOrRow, T> copy(Tuple2<KeyPartOrRow, T> from) {
+        return new Tuple2<>(from.f0, serializer(from.f0).copy(from.f1));
+    }
+
+    @Override
+    public void serialize(Tuple2<KeyPartOrRow, T> record, DataOutputView 
target)
+            throws IOException {
+        target.writeByte(record.f0.toByteValue());
+        serializer(record.f0).serialize(record.f1, target);
+    }
+
+    @Override
+    public Tuple2<KeyPartOrRow, T> deserialize(DataInputView source) throws 
IOException {
+        KeyPartOrRow keyPartOrRow = 
KeyPartOrRow.fromByteValue(source.readByte());
+        T row = serializer(keyPartOrRow).deserialize(source);
+        return new Tuple2<>(keyPartOrRow, row);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KeyWithRowSerializer<?> that = (KeyWithRowSerializer<?>) o;
+        return Objects.equals(keyPartSerializer, that.keyPartSerializer)
+                && Objects.equals(rowSerializer, that.rowSerializer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(keyPartSerializer, rowSerializer);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
new file mode 100644
index 000000000..f0b0d0863
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/** A {@link TypeInformation} for internal serializer. */
+public class InternalTypeInfo<T> extends TypeInformation<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final InternalTypeSerializer<T> serializer;
+
+    public InternalTypeInfo(InternalTypeSerializer<T> serializer) {
+        this.serializer = serializer;
+    }
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Class<T> getTypeClass() {
+        return (Class<T>) serializer.createInstance().getClass();
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+        return serializer.duplicate();
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof InternalTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        InternalTypeInfo<?> that = (InternalTypeInfo<?>) o;
+        return Objects.equals(serializer, that.serializer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(serializer);
+    }
+
+    @Override
+    public String toString() {
+        return "InternalTypeInfo{" + "serializer=" + serializer + '}';
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeSerializer.java
new file mode 100644
index 000000000..33952e6f3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/** A simple version {@link TypeSerializer}. */
+public abstract class InternalTypeSerializer<T> extends TypeSerializer<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @Override
+    public T copy(T from, T reuse) {
+        return copy(from);
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public T deserialize(T reuse, DataInputView source) throws IOException {
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        serialize(deserialize(source), target);
+    }
+
+    @Override
+    public TypeSerializerSnapshot<T> snapshotConfiguration() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
new file mode 100644
index 000000000..c70d9633c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SerBiFunction;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** Project {@link BinaryRow} fields into {@link RowData}. */
+public class ProjectToRowDataFunction implements SerBiFunction<RowData, 
BinaryRow, RowData> {
+
+    private final RowData.FieldGetter[] fieldGetters;
+
+    private final Map<Integer, Integer> projectMapping;
+    private final RowData.FieldGetter[] projectGetters;
+
+    public ProjectToRowDataFunction(RowType rowType, List<String> 
projectFields) {
+        LogicalType[] types =
+                LogicalTypeConversion.toLogicalType(rowType)
+                        .getChildren()
+                        .toArray(new LogicalType[0]);
+        this.fieldGetters =
+                IntStream.range(0, types.length)
+                        .mapToObj(i -> createFieldGetter(types[i], i))
+                        .toArray(RowData.FieldGetter[]::new);
+
+        List<String> fieldNames = rowType.getFieldNames();
+        this.projectMapping =
+                projectFields.stream()
+                        .collect(Collectors.toMap(fieldNames::indexOf, 
projectFields::indexOf));
+        this.projectGetters =
+                projectFields.stream()
+                        .map(
+                                field ->
+                                        createFieldGetter(
+                                                
types[rowType.getFieldIndex(field)],
+                                                projectFields.indexOf(field)))
+                        .toArray(RowData.FieldGetter[]::new);
+    }
+
+    @Override
+    public RowData apply(RowData input, BinaryRow project) {
+        GenericRowData newRow = new GenericRowData(fieldGetters.length);
+        FlinkRowData partRow = new FlinkRowData(project);
+        for (int i = 0; i < fieldGetters.length; i++) {
+            Object field =
+                    projectMapping.containsKey(i)
+                            ? 
projectGetters[projectMapping.get(i)].getFieldOrNull(partRow)
+                            : fieldGetters[i].getFieldOrNull(input);
+            newRow.setField(i, field);
+        }
+        return newRow;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
new file mode 100644
index 000000000..65c608ba4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for batch file store. */
+public class GlobalDynamicBucketTableITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS T ("
+                        + "pt INT, "
+                        + "pk INT, "
+                        + "v INT, "
+                        + "PRIMARY KEY (pk) NOT ENFORCED"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + " 'bucket'='-1', "
+                        + " 'dynamic-bucket.target-row-num'='3' "
+                        + ")");
+    }
+
+    @Test
+    public void testWriteRead() {
+        sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1, 1),
+                        Row.of(1, 2, 2),
+                        Row.of(1, 3, 3),
+                        Row.of(1, 4, 4),
+                        Row.of(1, 5, 5));
+        sql("INSERT INTO T VALUES (1, 3, 33), (1, 1, 11)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1, 11),
+                        Row.of(1, 2, 2),
+                        Row.of(1, 3, 33),
+                        Row.of(1, 4, 4),
+                        Row.of(1, 5, 5));
+
+        assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+                .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
+
+        // change partition
+        sql("INSERT INTO T VALUES (2, 1, 2), (2, 2, 3)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(2, 1, 2),
+                        Row.of(2, 2, 3),
+                        Row.of(1, 3, 33),
+                        Row.of(1, 4, 4),
+                        Row.of(1, 5, 5));
+    }
+
+    @Test
+    public void testWriteWithAssignerParallelism() {
+        sql(
+                "INSERT INTO T /*+ 
OPTIONS('dynamic-bucket.assigner-parallelism'='3') */ "
+                        + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");
+        assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+                .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index a16447364..81c9d1495 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -67,16 +67,22 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
     @Test
     @Timeout(120)
     public void testRandomCdcEvents() throws Exception {
-        innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1);
+        innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1, 
false);
     }
 
     @Test
     @Timeout(120)
     public void testRandomCdcEventsDynamicBucket() throws Exception {
-        innerTestRandomCdcEvents(-1);
+        innerTestRandomCdcEvents(-1, false);
     }
 
-    private void innerTestRandomCdcEvents(int numBucket) throws Exception {
+    @Test
+    @Timeout(120)
+    public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
+        innerTestRandomCdcEvents(-1, true);
+    }
+
+    private void innerTestRandomCdcEvents(int numBucket, boolean globalIndex) 
throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
         int numEvents = random.nextInt(1500) + 1;
@@ -118,7 +124,7 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
                         fileIO,
                         testTable.initialRowType(),
                         Collections.singletonList("pt"),
-                        Arrays.asList("pt", "k"),
+                        globalIndex ? Collections.singletonList("k") : 
Arrays.asList("pt", "k"),
                         numBucket);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
new file mode 100644
index 000000000..17b120282
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GlobalIndexAssigner}. */
+public class GlobalIndexAssignerTest extends TableTestBase {
+
+    private GlobalIndexAssigner<RowData> createAssigner(MergeEngine 
mergeEngine) throws Exception {
+        Identifier identifier = identifier("T");
+        Options options = new Options();
+        options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
+        if (mergeEngine == MergeEngine.FIRST_ROW) {
+            options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.LOOKUP);
+        }
+        options.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 3L);
+        options.set(CoreOptions.BUCKET, -1);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk")
+                        .options(options.toMap())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        return 
GlobalIndexAssignerOperator.createRowDataAssigner(catalog.getTable(identifier));
+    }
+
+    @Test
+    public void testBucketAssign() throws Exception {
+        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.DEDUPLICATE);
+        List<Integer> output = new ArrayList<>();
+        assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) -> 
output.add(bucket));
+
+        // assign
+        assigner.process(GenericRowData.of(1, 1, 1));
+        assigner.process(GenericRowData.of(1, 2, 2));
+        assigner.process(GenericRowData.of(1, 3, 3));
+        assertThat(output).containsExactly(0, 0, 0);
+        output.clear();
+
+        // full
+        assigner.process(GenericRowData.of(1, 4, 4));
+        assertThat(output).containsExactly(2);
+        output.clear();
+
+        // another partition
+        assigner.process(GenericRowData.of(2, 5, 5));
+        assertThat(output).containsExactly(0);
+        output.clear();
+
+        // read assigned
+        assigner.process(GenericRowData.of(1, 4, 4));
+        assigner.process(GenericRowData.of(1, 2, 2));
+        assigner.process(GenericRowData.of(1, 3, 3));
+        assertThat(output).containsExactly(2, 0, 0);
+        output.clear();
+
+        assigner.close();
+    }
+
+    @Test
+    public void testUpsert() throws Exception {
+        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.DEDUPLICATE);
+        List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+        assigner.open(
+                new File(warehouse.getPath()),
+                2,
+                0,
+                (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+
+        // change partition
+        assigner.process(GenericRowData.of(1, 1, 1));
+        assigner.process(GenericRowData.of(2, 1, 2));
+        assertThat(output)
+                .containsExactly(
+                        new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
+                        new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1, 
1, 2), 0),
+                        new Tuple2<>(GenericRowData.of(2, 1, 2), 0));
+        output.clear();
+
+        // test partition 1 deleted
+        assigner.process(GenericRowData.of(1, 2, 2));
+        assigner.process(GenericRowData.of(1, 3, 3));
+        assigner.process(GenericRowData.of(1, 4, 4));
+        assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+        output.clear();
+
+        // move from full bucket
+        assigner.process(GenericRowData.of(2, 4, 4));
+        assertThat(output)
+                .containsExactly(
+                        new Tuple2<>(GenericRowData.ofKind(RowKind.DELETE, 1, 
4, 4), 0),
+                        new Tuple2<>(GenericRowData.of(2, 4, 4), 0));
+        output.clear();
+
+        // test partition 1 deleted
+        assigner.process(GenericRowData.of(1, 5, 5));
+        assertThat(output.stream().map(t -> t.f1)).containsExactly(0);
+        output.clear();
+
+        assigner.close();
+    }
+
+    @Test
+    public void testUseOldPartition() throws Exception {
+        MergeEngine mergeEngine =
+                ThreadLocalRandom.current().nextBoolean()
+                        ? MergeEngine.PARTIAL_UPDATE
+                        : MergeEngine.AGGREGATE;
+        GlobalIndexAssigner<RowData> assigner = createAssigner(mergeEngine);
+        List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+        assigner.open(
+                new File(warehouse.getPath()),
+                2,
+                0,
+                (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+
+        // change partition
+        assigner.process(GenericRowData.of(1, 1, 1));
+        assigner.process(GenericRowData.of(2, 1, 2));
+        assertThat(output)
+                .containsExactly(
+                        new Tuple2<>(GenericRowData.of(1, 1, 1), 0),
+                        new Tuple2<>(GenericRowData.of(1, 1, 2), 0));
+        output.clear();
+
+        // test partition 2 no effect
+        assigner.process(GenericRowData.of(2, 2, 2));
+        assigner.process(GenericRowData.of(2, 3, 3));
+        assigner.process(GenericRowData.of(2, 4, 4));
+        assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+        output.clear();
+    }
+
+    @Test
+    public void testFirstRow() throws Exception {
+        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.FIRST_ROW);
+        List<Tuple2<RowData, Integer>> output = new ArrayList<>();
+        assigner.open(
+                new File(warehouse.getPath()),
+                2,
+                0,
+                (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+
+        // change partition
+        assigner.process(GenericRowData.of(1, 1, 1));
+        assigner.process(GenericRowData.of(2, 1, 2));
+        assertThat(output).containsExactly(new Tuple2<>(GenericRowData.of(1, 
1, 1), 0));
+        output.clear();
+
+        // test partition 2 no effect
+        assigner.process(GenericRowData.of(2, 2, 2));
+        assigner.process(GenericRowData.of(2, 3, 3));
+        assigner.process(GenericRowData.of(2, 4, 4));
+        assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
+        output.clear();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
new file mode 100644
index 000000000..cd185d1de
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IndexBootstrap}. */
+public class IndexBootstrapTest extends TableTestBase {
+
+    @Test
+    public void test() throws Exception {
+        Identifier identifier = identifier("T");
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 5);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("col", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .primaryKey("pk")
+                        .options(options.toMap())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+
+        write(
+                table,
+                GenericRow.of(1, 1),
+                GenericRow.of(2, 2),
+                GenericRow.of(3, 3),
+                GenericRow.of(4, 4),
+                GenericRow.of(5, 5),
+                GenericRow.of(6, 6),
+                GenericRow.of(7, 7));
+
+        IndexBootstrap indexBootstrap = new IndexBootstrap(table);
+        List<Integer> result = new ArrayList<>();
+        Consumer<InternalRow> consumer = row -> result.add(row.getInt(0));
+
+        indexBootstrap.bootstrap(2, 0, consumer);
+        assertThat(result).containsExactlyInAnyOrder(2, 3);
+        result.clear();
+
+        indexBootstrap.bootstrap(2, 1, consumer);
+        assertThat(result).containsExactlyInAnyOrder(1, 4, 5, 6, 7);
+        result.clear();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 5841570a0..00e30e598 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -382,22 +382,6 @@ public class SparkReadITCase extends SparkReadTestBase {
                 .contains("[k1,v1]", "[k2,v2]");
     }
 
-    @Test
-    public void testCreateTableWithInvalidPk() {
-        assertThatThrownBy(
-                        () ->
-                                spark.sql(
-                                        "CREATE TABLE PartitionedPkTable (\n"
-                                                + "a BIGINT,\n"
-                                                + "b STRING,\n"
-                                                + "c DOUBLE)\n"
-                                                + "PARTITIONED BY (b)\n"
-                                                + "TBLPROPERTIES 
('primary-key' = 'a')"))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining(
-                        "Primary key constraint [a] should include all 
partition fields [b]");
-    }
-
     @Test
     public void testCreateTableWithNonexistentPk() {
         spark.sql("USE paimon");

Reply via email to