This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d26ee7f1e [core] Support bulk load mode for partial-update 
merge-engine (#2362)
d26ee7f1e is described below

commit d26ee7f1e6308aae82930fa42316d1a2055bd82d
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 22 20:02:48 2023 +0800

    [core] Support bulk load mode for partial-update merge-engine (#2362)
---
 .../paimon/crosspartition/BucketAssigner.java      |  73 +++++++
 .../crosspartition/DeleteExistingProcessor.java    |  71 +++++++
 .../paimon/crosspartition/ExistingProcessor.java   |  82 ++++++++
 .../paimon/crosspartition/GlobalIndexAssigner.java | 211 ++++++++-------------
 .../crosspartition/SkipNewExistingProcessor.java   |  55 ++++++
 .../crosspartition/UseOldExistingProcessor.java    |  74 ++++++++
 .../org/apache/paimon/schema/SchemaValidation.java |  26 ++-
 .../java/org/apache/paimon/utils/RowIterator.java  |  30 +++
 .../org/apache/paimon/schema/TableSchemaTest.java  |  15 +-
 .../flink/GlobalDynamicBucketTableITCase.java      |  48 ++++-
 10 files changed, 541 insertions(+), 144 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/BucketAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/BucketAssigner.java
new file mode 100644
index 000000000..e5be1aed4
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/BucketAssigner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.Filter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+/** Bucket Assigner to assign bucket in a partition. */
+public class BucketAssigner {
+
+    private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new 
HashMap<>();
+
+    public void bootstrapBucket(BinaryRow part, int bucket) {
+        TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+        Integer count = bucketMap.get(bucket);
+        if (count == null) {
+            count = 0;
+        }
+        bucketMap.put(bucket, count + 1);
+    }
+
+    public int assignBucket(BinaryRow part, Filter<Integer> filter, int 
maxCount) {
+        TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+        for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
+            int bucket = entry.getKey();
+            int count = entry.getValue();
+            if (filter.test(bucket) && count < maxCount) {
+                bucketMap.put(bucket, count + 1);
+                return bucket;
+            }
+        }
+
+        for (int i = 0; ; i++) {
+            if (filter.test(i) && !bucketMap.containsKey(i)) {
+                bucketMap.put(i, 1);
+                return i;
+            }
+        }
+    }
+
+    public void decrement(BinaryRow part, int bucket) {
+        bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
+    }
+
+    private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
+        TreeMap<Integer, Integer> map = stats.get(part);
+        if (map == null) {
+            map = new TreeMap<>();
+            stats.put(part.copy(), map);
+        }
+        return map;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/DeleteExistingProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/DeleteExistingProcessor.java
new file mode 100644
index 000000000..a9388b011
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/DeleteExistingProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** A {@link ExistingProcessor} to delete old record. */
+public class DeleteExistingProcessor implements ExistingProcessor {
+
+    private final ProjectToRowFunction setPartition;
+    private final BucketAssigner bucketAssigner;
+    private final BiConsumer<InternalRow, Integer> collector;
+
+    public DeleteExistingProcessor(
+            ProjectToRowFunction setPartition,
+            BucketAssigner bucketAssigner,
+            BiConsumer<InternalRow, Integer> collector) {
+        this.setPartition = setPartition;
+        this.bucketAssigner = bucketAssigner;
+        this.collector = collector;
+    }
+
+    @Override
+    public boolean processExists(InternalRow newRow, BinaryRow previousPart, 
int previousBucket) {
+        // retract old record
+        InternalRow retract = setPartition.apply(newRow, previousPart);
+        retract.setRowKind(RowKind.DELETE);
+        collector.accept(retract, previousBucket);
+        bucketAssigner.decrement(previousPart, previousBucket);
+
+        // new record
+        return true;
+    }
+
+    @Override
+    public void bulkLoadNewRecords(
+            Function<SortOrder, RowIterator> iteratorFunction,
+            Function<InternalRow, BinaryRow> extractPrimary,
+            Function<InternalRow, BinaryRow> extractPartition,
+            Function<BinaryRow, Integer> assignBucket) {
+        ExistingProcessor.bulkLoadCollectFirst(
+                collector,
+                iteratorFunction.apply(SortOrder.DESCENDING),
+                extractPrimary,
+                extractPartition,
+                assignBucket);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/ExistingProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/ExistingProcessor.java
new file mode 100644
index 000000000..6228a2a98
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/ExistingProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** Processor to process existing key. */
+public interface ExistingProcessor {
+
+    /** @return should process new record. */
+    boolean processExists(InternalRow newRow, BinaryRow previousPart, int 
previousBucket);
+
+    void bulkLoadNewRecords(
+            Function<SortOrder, RowIterator> iteratorFunction,
+            Function<InternalRow, BinaryRow> extractPrimary,
+            Function<InternalRow, BinaryRow> extractPartition,
+            Function<BinaryRow, Integer> assignBucket);
+
+    static void bulkLoadCollectFirst(
+            BiConsumer<InternalRow, Integer> collector,
+            RowIterator iterator,
+            Function<InternalRow, BinaryRow> extractPrimary,
+            Function<InternalRow, BinaryRow> extractPartition,
+            Function<BinaryRow, Integer> assignBucket) {
+        InternalRow row;
+        BinaryRow currentKey = null;
+        while ((row = iterator.next()) != null) {
+            BinaryRow primaryKey = extractPrimary.apply(row);
+            if (currentKey == null || !currentKey.equals(primaryKey)) {
+                collector.accept(row, 
assignBucket.apply(extractPartition.apply(row)));
+                currentKey = primaryKey.copy();
+            }
+        }
+    }
+
+    static ExistingProcessor create(
+            MergeEngine mergeEngine,
+            ProjectToRowFunction setPartition,
+            BucketAssigner bucketAssigner,
+            BiConsumer<InternalRow, Integer> collector) {
+        switch (mergeEngine) {
+            case DEDUPLICATE:
+                return new DeleteExistingProcessor(setPartition, 
bucketAssigner, collector);
+            case PARTIAL_UPDATE:
+            case AGGREGATE:
+                return new UseOldExistingProcessor(setPartition, collector);
+            case FIRST_ROW:
+                return new SkipNewExistingProcessor(collector);
+            default:
+                throw new UnsupportedOperationException("Unsupported engine: " 
+ mergeEngine);
+        }
+    }
+
+    /** Input Order for sorting. */
+    enum SortOrder {
+        ASCENDING,
+        DESCENDING,
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 6450d07fc..10821d87e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.crosspartition;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.crosspartition.ExistingProcessor.SortOrder;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -40,10 +40,8 @@ import org.apache.paimon.table.sink.PartitionKeyExtractor;
 import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.IDMapping;
 import org.apache.paimon.utils.KeyValueIterator;
 import org.apache.paimon.utils.MutableObjectIterator;
@@ -51,21 +49,22 @@ import org.apache.paimon.utils.OffsetRow;
 import org.apache.paimon.utils.PositiveIntInt;
 import org.apache.paimon.utils.PositiveIntIntSerializer;
 import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
 import org.apache.paimon.utils.TypeUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -98,7 +97,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
 
     private transient IDMapping<BinaryRow> partMapping;
     private transient BucketAssigner bucketAssigner;
-    private transient ExistsAction existsAction;
+    private transient ExistingProcessor existingProcessor;
 
     public GlobalIndexAssigner(Table table) {
         this.table = (AbstractFileStoreTable) table;
@@ -147,7 +146,9 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
 
         this.partMapping = new IDMapping<>(BinaryRow::copy);
         this.bucketAssigner = new BucketAssigner();
-        this.existsAction = fromMergeEngine(coreOptions.mergeEngine());
+        this.existingProcessor =
+                ExistingProcessor.create(
+                        coreOptions.mergeEngine(), setPartition, 
bucketAssigner, this::collect);
 
         // create bootstrap sort buffer
         this.bootstrap = true;
@@ -225,7 +226,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
         bootstrapKeys.clear();
         bootstrapKeys = null;
 
-        if (isEmpty && isEndInput && table.coreOptions().mergeEngine() == 
MergeEngine.DEDUPLICATE) {
+        if (isEmpty && isEndInput) {
             // optimization: bulk load mode
             bulkLoadBootstrapRecords();
         } else {
@@ -251,30 +252,11 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
             if (previousPartId == partId) {
                 collect(value, previousBucket);
             } else {
-                switch (existsAction) {
-                    case DELETE:
-                        {
-                            // retract old record
-                            BinaryRow previousPart = 
partMapping.get(previousPartId);
-                            InternalRow retract = setPartition.apply(value, 
previousPart);
-                            retract.setRowKind(RowKind.DELETE);
-                            collect(retract, previousBucket);
-                            bucketAssigner.decrement(previousPart, 
previousBucket);
-
-                            // new record
-                            processNewRecord(partition, partId, key, value);
-                            break;
-                        }
-                    case USE_OLD:
-                        {
-                            BinaryRow previousPart = 
partMapping.get(previousPartId);
-                            InternalRow newValue = setPartition.apply(value, 
previousPart);
-                            collect(newValue, previousBucket);
-                            break;
-                        }
-                    case SKIP_NEW:
-                        // do nothing
-                        break;
+                BinaryRow previousPart = partMapping.get(previousPartId);
+                boolean processNewRecord =
+                        existingProcessor.processExists(value, previousPart, 
previousBucket);
+                if (processNewRecord) {
+                    processNewRecord(partition, partId, key, value);
                 }
             }
         } else {
@@ -298,7 +280,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
     // ================== End Public API ===================
 
     /** Sort bootstrap records and assign bucket without RocksDB. */
-    private void bulkLoadBootstrapRecords() throws Exception {
+    private void bulkLoadBootstrapRecords() {
         RowType rowType = table.rowType();
         List<DataType> fields =
                 new ArrayList<>(TypeUtils.project(rowType, 
table.primaryKeys()).getFieldTypes());
@@ -318,39 +300,70 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                         coreOptions.writeBufferSize() / 2,
                         coreOptions.pageSize(),
                         coreOptions.localSortMaxNumFileHandles());
-        int id = Integer.MAX_VALUE;
-        GenericRow idRow = new GenericRow(1);
-        JoinedRow keyAndId = new JoinedRow();
-        JoinedRow keyAndRow = new JoinedRow();
-        try (RowBuffer.RowBufferIterator iterator = 
bootstrapRecords.newIterator()) {
-            while (iterator.advanceNext()) {
-                BinaryRow row = iterator.getRow();
-                BinaryRow key = extractor.trimmedPrimaryKey(row);
-                idRow.setField(0, id);
-                keyAndId.replace(key, idRow);
-                keyAndRow.replace(keyAndId, row);
-                keyIdBuffer.write(keyAndRow);
-                id--;
-            }
-        }
-        bootstrapRecords.reset();
-        bootstrapRecords = null;
 
-        // 2. loop sorted iterator to assign bucket
-        MutableObjectIterator<BinaryRow> iterator = 
keyIdBuffer.sortedIterator();
-        BinaryRow keyWithRow = new BinaryRow(keyWithRowType.getFieldCount());
-        OffsetRow row = new OffsetRow(rowType.getFieldCount(), 
keyWithIdType.getFieldCount());
-        BinaryRow currentKey = null;
-        while ((keyWithRow = iterator.next(keyWithRow)) != null) {
-            row.replace(keyWithRow);
-            BinaryRow key = extractor.trimmedPrimaryKey(row);
-            if (currentKey == null || !currentKey.equals(key)) {
-                // output first record
-                BinaryRow partition = extractor.partition(row);
-                collect(row, assignBucket(partition));
-                currentKey = key.copy();
-            }
-        }
+        Function<SortOrder, RowIterator> iteratorFunction =
+                sortOrder -> {
+                    int id = sortOrder == SortOrder.ASCENDING ? 0 : 
Integer.MAX_VALUE;
+                    GenericRow idRow = new GenericRow(1);
+                    JoinedRow keyAndId = new JoinedRow();
+                    JoinedRow keyAndRow = new JoinedRow();
+                    try (RowBuffer.RowBufferIterator iterator = 
bootstrapRecords.newIterator()) {
+                        while (iterator.advanceNext()) {
+                            BinaryRow row = iterator.getRow();
+                            BinaryRow key = extractor.trimmedPrimaryKey(row);
+                            idRow.setField(0, id);
+                            keyAndId.replace(key, idRow);
+                            keyAndRow.replace(keyAndId, row);
+                            try {
+                                keyIdBuffer.write(keyAndRow);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                            if (sortOrder == SortOrder.ASCENDING) {
+                                id++;
+                            } else {
+                                id--;
+                            }
+                        }
+                    }
+                    bootstrapRecords.reset();
+                    bootstrapRecords = null;
+
+                    // 2. loop sorted iterator to assign bucket
+                    MutableObjectIterator<BinaryRow> iterator;
+                    try {
+                        iterator = keyIdBuffer.sortedIterator();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    BinaryRow reuseBinaryRow = new 
BinaryRow(keyWithRowType.getFieldCount());
+                    OffsetRow row =
+                            new OffsetRow(rowType.getFieldCount(), 
keyWithIdType.getFieldCount());
+                    return new RowIterator() {
+                        @Nullable
+                        @Override
+                        public InternalRow next() {
+                            BinaryRow keyWithRow;
+                            try {
+                                keyWithRow = iterator.next(reuseBinaryRow);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                            if (keyWithRow == null) {
+                                return null;
+                            }
+                            row.replace(keyWithRow);
+                            return row;
+                        }
+                    };
+                };
+
+        existingProcessor.bulkLoadNewRecords(
+                iteratorFunction,
+                extractor::trimmedPrimaryKey,
+                extractor::partition,
+                this::assignBucket);
 
         keyIdBuffer.clear();
     }
@@ -389,70 +402,4 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
     private void collect(InternalRow value, int bucket) {
         collector.accept(value, bucket);
     }
-
-    private static class BucketAssigner {
-
-        private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new 
HashMap<>();
-
-        public void bootstrapBucket(BinaryRow part, int bucket) {
-            TreeMap<Integer, Integer> bucketMap = bucketMap(part);
-            Integer count = bucketMap.get(bucket);
-            if (count == null) {
-                count = 0;
-            }
-            bucketMap.put(bucket, count + 1);
-        }
-
-        public int assignBucket(BinaryRow part, Filter<Integer> filter, int 
maxCount) {
-            TreeMap<Integer, Integer> bucketMap = bucketMap(part);
-            for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
-                int bucket = entry.getKey();
-                int count = entry.getValue();
-                if (filter.test(bucket) && count < maxCount) {
-                    bucketMap.put(bucket, count + 1);
-                    return bucket;
-                }
-            }
-
-            for (int i = 0; ; i++) {
-                if (filter.test(i) && !bucketMap.containsKey(i)) {
-                    bucketMap.put(i, 1);
-                    return i;
-                }
-            }
-        }
-
-        public void decrement(BinaryRow part, int bucket) {
-            bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
-        }
-
-        private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
-            TreeMap<Integer, Integer> map = stats.get(part);
-            if (map == null) {
-                map = new TreeMap<>();
-                stats.put(part.copy(), map);
-            }
-            return map;
-        }
-    }
-
-    private ExistsAction fromMergeEngine(MergeEngine mergeEngine) {
-        switch (mergeEngine) {
-            case DEDUPLICATE:
-                return ExistsAction.DELETE;
-            case PARTIAL_UPDATE:
-            case AGGREGATE:
-                return ExistsAction.USE_OLD;
-            case FIRST_ROW:
-                return ExistsAction.SKIP_NEW;
-            default:
-                throw new UnsupportedOperationException("Unsupported engine: " 
+ mergeEngine);
-        }
-    }
-
-    private enum ExistsAction {
-        DELETE,
-        USE_OLD,
-        SKIP_NEW
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/SkipNewExistingProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/SkipNewExistingProcessor.java
new file mode 100644
index 000000000..89af478e5
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/SkipNewExistingProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** A {@link ExistingProcessor} to skip new record. */
+public class SkipNewExistingProcessor implements ExistingProcessor {
+
+    private final BiConsumer<InternalRow, Integer> collector;
+
+    public SkipNewExistingProcessor(BiConsumer<InternalRow, Integer> 
collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public boolean processExists(InternalRow newRow, BinaryRow previousPart, 
int previousBucket) {
+        return false;
+    }
+
+    @Override
+    public void bulkLoadNewRecords(
+            Function<SortOrder, RowIterator> iteratorFunction,
+            Function<InternalRow, BinaryRow> extractPrimary,
+            Function<InternalRow, BinaryRow> extractPartition,
+            Function<BinaryRow, Integer> assignBucket) {
+        ExistingProcessor.bulkLoadCollectFirst(
+                collector,
+                iteratorFunction.apply(SortOrder.ASCENDING),
+                extractPrimary,
+                extractPartition,
+                assignBucket);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/UseOldExistingProcessor.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/UseOldExistingProcessor.java
new file mode 100644
index 000000000..38f493f61
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/UseOldExistingProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** A {@link ExistingProcessor} to use old partition and bucket. */
+public class UseOldExistingProcessor implements ExistingProcessor {
+
+    private final ProjectToRowFunction setPartition;
+    private final BiConsumer<InternalRow, Integer> collector;
+
+    public UseOldExistingProcessor(
+            ProjectToRowFunction setPartition, BiConsumer<InternalRow, 
Integer> collector) {
+        this.setPartition = setPartition;
+        this.collector = collector;
+    }
+
+    @Override
+    public boolean processExists(InternalRow newRow, BinaryRow previousPart, 
int previousBucket) {
+        InternalRow newValue = setPartition.apply(newRow, previousPart);
+        collector.accept(newValue, previousBucket);
+        return false;
+    }
+
+    @Override
+    public void bulkLoadNewRecords(
+            Function<SortOrder, RowIterator> iteratorFunction,
+            Function<InternalRow, BinaryRow> extractPrimary,
+            Function<InternalRow, BinaryRow> extractPartition,
+            Function<BinaryRow, Integer> assignBucket) {
+        RowIterator iterator = iteratorFunction.apply(SortOrder.ASCENDING);
+        InternalRow row;
+        BinaryRow currentKey = null;
+        BinaryRow currentPartition = null;
+        int currentBucket = -1;
+        while ((row = iterator.next()) != null) {
+            BinaryRow primaryKey = extractPrimary.apply(row);
+            BinaryRow partition = extractPartition.apply(row);
+            if (currentKey == null || !currentKey.equals(primaryKey)) {
+                int bucket = assignBucket.apply(partition);
+                collector.accept(row, bucket);
+                currentKey = primaryKey.copy();
+                currentPartition = partition.copy();
+                currentBucket = bucket;
+            } else {
+                InternalRow newRow = setPartition.apply(row, currentPartition);
+                collector.accept(newRow, currentBucket);
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3b8d6c222..4f4a1b068 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -189,12 +189,22 @@ public class SchemaValidation {
             }
         }
 
-        if (schema.crossPartitionUpdate() && options.bucket() != -1) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "You should use dynamic bucket (bucket = -1) mode 
in cross partition update case "
-                                    + "(Primary key constraint %s not include 
all partition fields %s).",
-                            schema.primaryKeys(), schema.partitionKeys()));
+        if (schema.crossPartitionUpdate()) {
+            if (options.bucket() != -1) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "You should use dynamic bucket (bucket = -1) 
mode in cross partition update case "
+                                        + "(Primary key constraint %s not 
include all partition fields %s).",
+                                schema.primaryKeys(), schema.partitionKeys()));
+            }
+
+            if (sequenceField.isPresent()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "You can not use sequence.field in cross 
partition update case "
+                                        + "(Primary key constraint %s not 
include all partition fields %s).",
+                                schema.primaryKeys(), schema.partitionKeys()));
+            }
         }
     }
 
@@ -361,9 +371,7 @@ public class SchemaValidation {
                         .collect(Collectors.toSet());
         if (!illegalGroup.isEmpty()) {
             throw new IllegalArgumentException(
-                    String.format(
-                            "Should not defined aggregation function on 
sequence group: "
-                                    + illegalGroup));
+                    "Should not defined aggregation function on sequence 
group: " + illegalGroup);
         }
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RowIterator.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/RowIterator.java
new file mode 100644
index 000000000..e1106cf3c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RowIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+
+import javax.annotation.Nullable;
+
+/** A simple iterator which provides simple next. */
+public interface RowIterator {
+
+    @Nullable
+    InternalRow next();
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 6171fdbb9..e0fc6b6d9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -30,6 +30,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
+import static org.apache.paimon.schema.SchemaValidation.validateTableSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -37,7 +40,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class TableSchemaTest {
 
     @Test
-    public void testInvalidPrimaryKeys() {
+    public void testCrossPartition() {
         List<DataField> fields =
                 Arrays.asList(
                         new DataField(0, "f0", DataTypes.INT()),
@@ -50,6 +53,16 @@ public class TableSchemaTest {
         TableSchema schema =
                 new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, "");
         assertThat(schema.crossPartitionUpdate()).isTrue();
+
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining("You should use dynamic bucket");
+
+        options.put(BUCKET.key(), "-1");
+        validateTableSchema(schema);
+
+        options.put(SEQUENCE_FIELD.key(), "f2");
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining("You can not use sequence.field");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index 7d5d70b48..fc6172b7a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -31,7 +31,7 @@ public class GlobalDynamicBucketTableITCase extends 
CatalogITCaseBase {
 
     @Override
     protected List<String> ddl() {
-        return Collections.singletonList(
+        return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS T ("
                         + "pt INT, "
                         + "pk INT, "
@@ -40,9 +40,53 @@ public class GlobalDynamicBucketTableITCase extends 
CatalogITCaseBase {
                         + ") PARTITIONED BY (pt) WITH ("
                         + " 'bucket'='-1', "
                         + " 'dynamic-bucket.target-row-num'='3' "
+                        + ")",
+                "CREATE TABLE IF NOT EXISTS partial_update_t ("
+                        + "pt INT, "
+                        + "pk INT, "
+                        + "v1 INT, "
+                        + "v2 INT, "
+                        + "PRIMARY KEY (pk) NOT ENFORCED"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + " 'merge-engine'='partial-update', "
+                        + " 'bucket'='-1', "
+                        + " 'dynamic-bucket.target-row-num'='3' "
+                        + ")",
+                "CREATE TABLE IF NOT EXISTS first_row_t ("
+                        + "pt INT, "
+                        + "pk INT, "
+                        + "v INT, "
+                        + "PRIMARY KEY (pk) NOT ENFORCED"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + " 'merge-engine'='first-row', "
+                        + " 'changelog-producer'='lookup', "
+                        + " 'bucket'='-1', "
+                        + " 'dynamic-bucket.target-row-num'='3' "
                         + ")");
     }
 
+    @Test
+    public void testBulkLoad() {
+        sql("INSERT INTO T VALUES (1, 1, 1), (2, 1, 2), (1, 3, 3), (2, 4, 4), 
(3, 3, 5)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(2, 1, 2), Row.of(3, 3, 5), 
Row.of(2, 4, 4));
+
+        sql(
+                "INSERT INTO partial_update_t VALUES"
+                        + " (1, 1, 1, 1),"
+                        + " (2, 1, 2, 3),"
+                        + " (1, 3, 3, 3),"
+                        + " (2, 4, 4, 4),"
+                        + " (3, 3, CAST(NULL AS INT), 5)");
+        assertThat(sql("SELECT * FROM partial_update_t"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1, 2, 3), Row.of(2, 4, 4, 4), Row.of(1, 3, 
3, 5));
+
+        sql("INSERT INTO first_row_t VALUES (1, 1, 1), (2, 1, 2), (1, 3, 3), 
(2, 4, 4), (3, 3, 5)");
+        assertThat(sql("SELECT * FROM first_row_t"))
+                .containsExactlyInAnyOrder(Row.of(1, 1, 1), Row.of(1, 3, 3), 
Row.of(2, 4, 4));
+    }
+
     @Test
     public void testWriteRead() {
         sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");


Reply via email to