This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d26ee7f1e [core] Support bulk load mode for partial-update
merge-engine (#2362)
d26ee7f1e is described below
commit d26ee7f1e6308aae82930fa42316d1a2055bd82d
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 22 20:02:48 2023 +0800
[core] Support bulk load mode for partial-update merge-engine (#2362)
---
.../paimon/crosspartition/BucketAssigner.java | 73 +++++++
.../crosspartition/DeleteExistingProcessor.java | 71 +++++++
.../paimon/crosspartition/ExistingProcessor.java | 82 ++++++++
.../paimon/crosspartition/GlobalIndexAssigner.java | 211 ++++++++-------------
.../crosspartition/SkipNewExistingProcessor.java | 55 ++++++
.../crosspartition/UseOldExistingProcessor.java | 74 ++++++++
.../org/apache/paimon/schema/SchemaValidation.java | 26 ++-
.../java/org/apache/paimon/utils/RowIterator.java | 30 +++
.../org/apache/paimon/schema/TableSchemaTest.java | 15 +-
.../flink/GlobalDynamicBucketTableITCase.java | 48 ++++-
10 files changed, 541 insertions(+), 144 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/BucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/BucketAssigner.java
new file mode 100644
index 000000000..e5be1aed4
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/BucketAssigner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.Filter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+/** Bucket Assigner to assign bucket in a partition. */
+public class BucketAssigner {
+
+ private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new
HashMap<>();
+
+ public void bootstrapBucket(BinaryRow part, int bucket) {
+ TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+ Integer count = bucketMap.get(bucket);
+ if (count == null) {
+ count = 0;
+ }
+ bucketMap.put(bucket, count + 1);
+ }
+
+ public int assignBucket(BinaryRow part, Filter<Integer> filter, int
maxCount) {
+ TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+ for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
+ int bucket = entry.getKey();
+ int count = entry.getValue();
+ if (filter.test(bucket) && count < maxCount) {
+ bucketMap.put(bucket, count + 1);
+ return bucket;
+ }
+ }
+
+ for (int i = 0; ; i++) {
+ if (filter.test(i) && !bucketMap.containsKey(i)) {
+ bucketMap.put(i, 1);
+ return i;
+ }
+ }
+ }
+
+ public void decrement(BinaryRow part, int bucket) {
+ bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
+ }
+
+ private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
+ TreeMap<Integer, Integer> map = stats.get(part);
+ if (map == null) {
+ map = new TreeMap<>();
+ stats.put(part.copy(), map);
+ }
+ return map;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/DeleteExistingProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/DeleteExistingProcessor.java
new file mode 100644
index 000000000..a9388b011
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/DeleteExistingProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** A {@link ExistingProcessor} to delete old record. */
+public class DeleteExistingProcessor implements ExistingProcessor {
+
+ private final ProjectToRowFunction setPartition;
+ private final BucketAssigner bucketAssigner;
+ private final BiConsumer<InternalRow, Integer> collector;
+
+ public DeleteExistingProcessor(
+ ProjectToRowFunction setPartition,
+ BucketAssigner bucketAssigner,
+ BiConsumer<InternalRow, Integer> collector) {
+ this.setPartition = setPartition;
+ this.bucketAssigner = bucketAssigner;
+ this.collector = collector;
+ }
+
+ @Override
+ public boolean processExists(InternalRow newRow, BinaryRow previousPart,
int previousBucket) {
+ // retract old record
+ InternalRow retract = setPartition.apply(newRow, previousPart);
+ retract.setRowKind(RowKind.DELETE);
+ collector.accept(retract, previousBucket);
+ bucketAssigner.decrement(previousPart, previousBucket);
+
+ // new record
+ return true;
+ }
+
+ @Override
+ public void bulkLoadNewRecords(
+ Function<SortOrder, RowIterator> iteratorFunction,
+ Function<InternalRow, BinaryRow> extractPrimary,
+ Function<InternalRow, BinaryRow> extractPartition,
+ Function<BinaryRow, Integer> assignBucket) {
+ ExistingProcessor.bulkLoadCollectFirst(
+ collector,
+ iteratorFunction.apply(SortOrder.DESCENDING),
+ extractPrimary,
+ extractPartition,
+ assignBucket);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/ExistingProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/ExistingProcessor.java
new file mode 100644
index 000000000..6228a2a98
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/ExistingProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** Processor to process existing key. */
+public interface ExistingProcessor {
+
+ /** @return should process new record. */
+ boolean processExists(InternalRow newRow, BinaryRow previousPart, int
previousBucket);
+
+ void bulkLoadNewRecords(
+ Function<SortOrder, RowIterator> iteratorFunction,
+ Function<InternalRow, BinaryRow> extractPrimary,
+ Function<InternalRow, BinaryRow> extractPartition,
+ Function<BinaryRow, Integer> assignBucket);
+
+ static void bulkLoadCollectFirst(
+ BiConsumer<InternalRow, Integer> collector,
+ RowIterator iterator,
+ Function<InternalRow, BinaryRow> extractPrimary,
+ Function<InternalRow, BinaryRow> extractPartition,
+ Function<BinaryRow, Integer> assignBucket) {
+ InternalRow row;
+ BinaryRow currentKey = null;
+ while ((row = iterator.next()) != null) {
+ BinaryRow primaryKey = extractPrimary.apply(row);
+ if (currentKey == null || !currentKey.equals(primaryKey)) {
+ collector.accept(row,
assignBucket.apply(extractPartition.apply(row)));
+ currentKey = primaryKey.copy();
+ }
+ }
+ }
+
+ static ExistingProcessor create(
+ MergeEngine mergeEngine,
+ ProjectToRowFunction setPartition,
+ BucketAssigner bucketAssigner,
+ BiConsumer<InternalRow, Integer> collector) {
+ switch (mergeEngine) {
+ case DEDUPLICATE:
+ return new DeleteExistingProcessor(setPartition,
bucketAssigner, collector);
+ case PARTIAL_UPDATE:
+ case AGGREGATE:
+ return new UseOldExistingProcessor(setPartition, collector);
+ case FIRST_ROW:
+ return new SkipNewExistingProcessor(collector);
+ default:
+ throw new UnsupportedOperationException("Unsupported engine: "
+ mergeEngine);
+ }
+ }
+
+ /** Input Order for sorting. */
+ enum SortOrder {
+ ASCENDING,
+ DESCENDING,
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 6450d07fc..10821d87e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -19,7 +19,7 @@
package org.apache.paimon.crosspartition;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.crosspartition.ExistingProcessor.SortOrder;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -40,10 +40,8 @@ import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.IDMapping;
import org.apache.paimon.utils.KeyValueIterator;
import org.apache.paimon.utils.MutableObjectIterator;
@@ -51,21 +49,22 @@ import org.apache.paimon.utils.OffsetRow;
import org.apache.paimon.utils.PositiveIntInt;
import org.apache.paimon.utils.PositiveIntIntSerializer;
import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
import org.apache.paimon.utils.TypeUtils;
+import javax.annotation.Nullable;
+
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -98,7 +97,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
private transient IDMapping<BinaryRow> partMapping;
private transient BucketAssigner bucketAssigner;
- private transient ExistsAction existsAction;
+ private transient ExistingProcessor existingProcessor;
public GlobalIndexAssigner(Table table) {
this.table = (AbstractFileStoreTable) table;
@@ -147,7 +146,9 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
this.partMapping = new IDMapping<>(BinaryRow::copy);
this.bucketAssigner = new BucketAssigner();
- this.existsAction = fromMergeEngine(coreOptions.mergeEngine());
+ this.existingProcessor =
+ ExistingProcessor.create(
+ coreOptions.mergeEngine(), setPartition,
bucketAssigner, this::collect);
// create bootstrap sort buffer
this.bootstrap = true;
@@ -225,7 +226,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
bootstrapKeys.clear();
bootstrapKeys = null;
- if (isEmpty && isEndInput && table.coreOptions().mergeEngine() ==
MergeEngine.DEDUPLICATE) {
+ if (isEmpty && isEndInput) {
// optimization: bulk load mode
bulkLoadBootstrapRecords();
} else {
@@ -251,30 +252,11 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
if (previousPartId == partId) {
collect(value, previousBucket);
} else {
- switch (existsAction) {
- case DELETE:
- {
- // retract old record
- BinaryRow previousPart =
partMapping.get(previousPartId);
- InternalRow retract = setPartition.apply(value,
previousPart);
- retract.setRowKind(RowKind.DELETE);
- collect(retract, previousBucket);
- bucketAssigner.decrement(previousPart,
previousBucket);
-
- // new record
- processNewRecord(partition, partId, key, value);
- break;
- }
- case USE_OLD:
- {
- BinaryRow previousPart =
partMapping.get(previousPartId);
- InternalRow newValue = setPartition.apply(value,
previousPart);
- collect(newValue, previousBucket);
- break;
- }
- case SKIP_NEW:
- // do nothing
- break;
+ BinaryRow previousPart = partMapping.get(previousPartId);
+ boolean processNewRecord =
+ existingProcessor.processExists(value, previousPart,
previousBucket);
+ if (processNewRecord) {
+ processNewRecord(partition, partId, key, value);
}
}
} else {
@@ -298,7 +280,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
// ================== End Public API ===================
/** Sort bootstrap records and assign bucket without RocksDB. */
- private void bulkLoadBootstrapRecords() throws Exception {
+ private void bulkLoadBootstrapRecords() {
RowType rowType = table.rowType();
List<DataType> fields =
new ArrayList<>(TypeUtils.project(rowType,
table.primaryKeys()).getFieldTypes());
@@ -318,39 +300,70 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
coreOptions.writeBufferSize() / 2,
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles());
- int id = Integer.MAX_VALUE;
- GenericRow idRow = new GenericRow(1);
- JoinedRow keyAndId = new JoinedRow();
- JoinedRow keyAndRow = new JoinedRow();
- try (RowBuffer.RowBufferIterator iterator =
bootstrapRecords.newIterator()) {
- while (iterator.advanceNext()) {
- BinaryRow row = iterator.getRow();
- BinaryRow key = extractor.trimmedPrimaryKey(row);
- idRow.setField(0, id);
- keyAndId.replace(key, idRow);
- keyAndRow.replace(keyAndId, row);
- keyIdBuffer.write(keyAndRow);
- id--;
- }
- }
- bootstrapRecords.reset();
- bootstrapRecords = null;
- // 2. loop sorted iterator to assign bucket
- MutableObjectIterator<BinaryRow> iterator =
keyIdBuffer.sortedIterator();
- BinaryRow keyWithRow = new BinaryRow(keyWithRowType.getFieldCount());
- OffsetRow row = new OffsetRow(rowType.getFieldCount(),
keyWithIdType.getFieldCount());
- BinaryRow currentKey = null;
- while ((keyWithRow = iterator.next(keyWithRow)) != null) {
- row.replace(keyWithRow);
- BinaryRow key = extractor.trimmedPrimaryKey(row);
- if (currentKey == null || !currentKey.equals(key)) {
- // output first record
- BinaryRow partition = extractor.partition(row);
- collect(row, assignBucket(partition));
- currentKey = key.copy();
- }
- }
+ Function<SortOrder, RowIterator> iteratorFunction =
+ sortOrder -> {
+ int id = sortOrder == SortOrder.ASCENDING ? 0 :
Integer.MAX_VALUE;
+ GenericRow idRow = new GenericRow(1);
+ JoinedRow keyAndId = new JoinedRow();
+ JoinedRow keyAndRow = new JoinedRow();
+ try (RowBuffer.RowBufferIterator iterator =
bootstrapRecords.newIterator()) {
+ while (iterator.advanceNext()) {
+ BinaryRow row = iterator.getRow();
+ BinaryRow key = extractor.trimmedPrimaryKey(row);
+ idRow.setField(0, id);
+ keyAndId.replace(key, idRow);
+ keyAndRow.replace(keyAndId, row);
+ try {
+ keyIdBuffer.write(keyAndRow);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (sortOrder == SortOrder.ASCENDING) {
+ id++;
+ } else {
+ id--;
+ }
+ }
+ }
+ bootstrapRecords.reset();
+ bootstrapRecords = null;
+
+ // 2. loop sorted iterator to assign bucket
+ MutableObjectIterator<BinaryRow> iterator;
+ try {
+ iterator = keyIdBuffer.sortedIterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ BinaryRow reuseBinaryRow = new
BinaryRow(keyWithRowType.getFieldCount());
+ OffsetRow row =
+ new OffsetRow(rowType.getFieldCount(),
keyWithIdType.getFieldCount());
+ return new RowIterator() {
+ @Nullable
+ @Override
+ public InternalRow next() {
+ BinaryRow keyWithRow;
+ try {
+ keyWithRow = iterator.next(reuseBinaryRow);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (keyWithRow == null) {
+ return null;
+ }
+ row.replace(keyWithRow);
+ return row;
+ }
+ };
+ };
+
+ existingProcessor.bulkLoadNewRecords(
+ iteratorFunction,
+ extractor::trimmedPrimaryKey,
+ extractor::partition,
+ this::assignBucket);
keyIdBuffer.clear();
}
@@ -389,70 +402,4 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
private void collect(InternalRow value, int bucket) {
collector.accept(value, bucket);
}
-
- private static class BucketAssigner {
-
- private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new
HashMap<>();
-
- public void bootstrapBucket(BinaryRow part, int bucket) {
- TreeMap<Integer, Integer> bucketMap = bucketMap(part);
- Integer count = bucketMap.get(bucket);
- if (count == null) {
- count = 0;
- }
- bucketMap.put(bucket, count + 1);
- }
-
- public int assignBucket(BinaryRow part, Filter<Integer> filter, int
maxCount) {
- TreeMap<Integer, Integer> bucketMap = bucketMap(part);
- for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
- int bucket = entry.getKey();
- int count = entry.getValue();
- if (filter.test(bucket) && count < maxCount) {
- bucketMap.put(bucket, count + 1);
- return bucket;
- }
- }
-
- for (int i = 0; ; i++) {
- if (filter.test(i) && !bucketMap.containsKey(i)) {
- bucketMap.put(i, 1);
- return i;
- }
- }
- }
-
- public void decrement(BinaryRow part, int bucket) {
- bucketMap(part).compute(bucket, (k, v) -> v == null ? 0 : v - 1);
- }
-
- private TreeMap<Integer, Integer> bucketMap(BinaryRow part) {
- TreeMap<Integer, Integer> map = stats.get(part);
- if (map == null) {
- map = new TreeMap<>();
- stats.put(part.copy(), map);
- }
- return map;
- }
- }
-
- private ExistsAction fromMergeEngine(MergeEngine mergeEngine) {
- switch (mergeEngine) {
- case DEDUPLICATE:
- return ExistsAction.DELETE;
- case PARTIAL_UPDATE:
- case AGGREGATE:
- return ExistsAction.USE_OLD;
- case FIRST_ROW:
- return ExistsAction.SKIP_NEW;
- default:
- throw new UnsupportedOperationException("Unsupported engine: "
+ mergeEngine);
- }
- }
-
- private enum ExistsAction {
- DELETE,
- USE_OLD,
- SKIP_NEW
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/SkipNewExistingProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/SkipNewExistingProcessor.java
new file mode 100644
index 000000000..89af478e5
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/SkipNewExistingProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** A {@link ExistingProcessor} to skip new record. */
+public class SkipNewExistingProcessor implements ExistingProcessor {
+
+ private final BiConsumer<InternalRow, Integer> collector;
+
+ public SkipNewExistingProcessor(BiConsumer<InternalRow, Integer>
collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public boolean processExists(InternalRow newRow, BinaryRow previousPart,
int previousBucket) {
+ return false;
+ }
+
+ @Override
+ public void bulkLoadNewRecords(
+ Function<SortOrder, RowIterator> iteratorFunction,
+ Function<InternalRow, BinaryRow> extractPrimary,
+ Function<InternalRow, BinaryRow> extractPartition,
+ Function<BinaryRow, Integer> assignBucket) {
+ ExistingProcessor.bulkLoadCollectFirst(
+ collector,
+ iteratorFunction.apply(SortOrder.ASCENDING),
+ extractPrimary,
+ extractPartition,
+ assignBucket);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/UseOldExistingProcessor.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/UseOldExistingProcessor.java
new file mode 100644
index 000000000..38f493f61
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/UseOldExistingProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.crosspartition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ProjectToRowFunction;
+import org.apache.paimon.utils.RowIterator;
+
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** A {@link ExistingProcessor} to use old partition and bucket. */
+public class UseOldExistingProcessor implements ExistingProcessor {
+
+ private final ProjectToRowFunction setPartition;
+ private final BiConsumer<InternalRow, Integer> collector;
+
+ public UseOldExistingProcessor(
+ ProjectToRowFunction setPartition, BiConsumer<InternalRow,
Integer> collector) {
+ this.setPartition = setPartition;
+ this.collector = collector;
+ }
+
+ @Override
+ public boolean processExists(InternalRow newRow, BinaryRow previousPart,
int previousBucket) {
+ InternalRow newValue = setPartition.apply(newRow, previousPart);
+ collector.accept(newValue, previousBucket);
+ return false;
+ }
+
+ @Override
+ public void bulkLoadNewRecords(
+ Function<SortOrder, RowIterator> iteratorFunction,
+ Function<InternalRow, BinaryRow> extractPrimary,
+ Function<InternalRow, BinaryRow> extractPartition,
+ Function<BinaryRow, Integer> assignBucket) {
+ RowIterator iterator = iteratorFunction.apply(SortOrder.ASCENDING);
+ InternalRow row;
+ BinaryRow currentKey = null;
+ BinaryRow currentPartition = null;
+ int currentBucket = -1;
+ while ((row = iterator.next()) != null) {
+ BinaryRow primaryKey = extractPrimary.apply(row);
+ BinaryRow partition = extractPartition.apply(row);
+ if (currentKey == null || !currentKey.equals(primaryKey)) {
+ int bucket = assignBucket.apply(partition);
+ collector.accept(row, bucket);
+ currentKey = primaryKey.copy();
+ currentPartition = partition.copy();
+ currentBucket = bucket;
+ } else {
+ InternalRow newRow = setPartition.apply(row, currentPartition);
+ collector.accept(newRow, currentBucket);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 3b8d6c222..4f4a1b068 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -189,12 +189,22 @@ public class SchemaValidation {
}
}
- if (schema.crossPartitionUpdate() && options.bucket() != -1) {
- throw new IllegalArgumentException(
- String.format(
- "You should use dynamic bucket (bucket = -1) mode
in cross partition update case "
- + "(Primary key constraint %s not include
all partition fields %s).",
- schema.primaryKeys(), schema.partitionKeys()));
+ if (schema.crossPartitionUpdate()) {
+ if (options.bucket() != -1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "You should use dynamic bucket (bucket = -1)
mode in cross partition update case "
+ + "(Primary key constraint %s not
include all partition fields %s).",
+ schema.primaryKeys(), schema.partitionKeys()));
+ }
+
+ if (sequenceField.isPresent()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "You can not use sequence.field in cross
partition update case "
+ + "(Primary key constraint %s not
include all partition fields %s).",
+ schema.primaryKeys(), schema.partitionKeys()));
+ }
}
}
@@ -361,9 +371,7 @@ public class SchemaValidation {
.collect(Collectors.toSet());
if (!illegalGroup.isEmpty()) {
throw new IllegalArgumentException(
- String.format(
- "Should not defined aggregation function on
sequence group: "
- + illegalGroup));
+ "Should not defined aggregation function on sequence
group: " + illegalGroup);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RowIterator.java
b/paimon-core/src/main/java/org/apache/paimon/utils/RowIterator.java
new file mode 100644
index 000000000..e1106cf3c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RowIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+
+import javax.annotation.Nullable;
+
+/** A simple iterator which provides simple next. */
+public interface RowIterator {
+
+ @Nullable
+ InternalRow next();
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index 6171fdbb9..e0fc6b6d9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -30,6 +30,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
+import static org.apache.paimon.schema.SchemaValidation.validateTableSchema;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -37,7 +40,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
public class TableSchemaTest {
@Test
- public void testInvalidPrimaryKeys() {
+ public void testCrossPartition() {
List<DataField> fields =
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
@@ -50,6 +53,16 @@ public class TableSchemaTest {
TableSchema schema =
new TableSchema(1, fields, 10, partitionKeys, primaryKeys,
options, "");
assertThat(schema.crossPartitionUpdate()).isTrue();
+
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining("You should use dynamic bucket");
+
+ options.put(BUCKET.key(), "-1");
+ validateTableSchema(schema);
+
+ options.put(SEQUENCE_FIELD.key(), "f2");
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining("You can not use sequence.field");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index 7d5d70b48..fc6172b7a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -31,7 +31,7 @@ public class GlobalDynamicBucketTableITCase extends
CatalogITCaseBase {
@Override
protected List<String> ddl() {
- return Collections.singletonList(
+ return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T ("
+ "pt INT, "
+ "pk INT, "
@@ -40,9 +40,53 @@ public class GlobalDynamicBucketTableITCase extends
CatalogITCaseBase {
+ ") PARTITIONED BY (pt) WITH ("
+ " 'bucket'='-1', "
+ " 'dynamic-bucket.target-row-num'='3' "
+ + ")",
+ "CREATE TABLE IF NOT EXISTS partial_update_t ("
+ + "pt INT, "
+ + "pk INT, "
+ + "v1 INT, "
+ + "v2 INT, "
+ + "PRIMARY KEY (pk) NOT ENFORCED"
+ + ") PARTITIONED BY (pt) WITH ("
+ + " 'merge-engine'='partial-update', "
+ + " 'bucket'='-1', "
+ + " 'dynamic-bucket.target-row-num'='3' "
+ + ")",
+ "CREATE TABLE IF NOT EXISTS first_row_t ("
+ + "pt INT, "
+ + "pk INT, "
+ + "v INT, "
+ + "PRIMARY KEY (pk) NOT ENFORCED"
+ + ") PARTITIONED BY (pt) WITH ("
+ + " 'merge-engine'='first-row', "
+ + " 'changelog-producer'='lookup', "
+ + " 'bucket'='-1', "
+ + " 'dynamic-bucket.target-row-num'='3' "
+ ")");
}
+ @Test
+ public void testBulkLoad() {
+ sql("INSERT INTO T VALUES (1, 1, 1), (2, 1, 2), (1, 3, 3), (2, 4, 4),
(3, 3, 5)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(2, 1, 2), Row.of(3, 3, 5),
Row.of(2, 4, 4));
+
+ sql(
+ "INSERT INTO partial_update_t VALUES"
+ + " (1, 1, 1, 1),"
+ + " (2, 1, 2, 3),"
+ + " (1, 3, 3, 3),"
+ + " (2, 4, 4, 4),"
+ + " (3, 3, CAST(NULL AS INT), 5)");
+ assertThat(sql("SELECT * FROM partial_update_t"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 1, 2, 3), Row.of(2, 4, 4, 4), Row.of(1, 3,
3, 5));
+
+ sql("INSERT INTO first_row_t VALUES (1, 1, 1), (2, 1, 2), (1, 3, 3),
(2, 4, 4), (3, 3, 5)");
+ assertThat(sql("SELECT * FROM first_row_t"))
+ .containsExactlyInAnyOrder(Row.of(1, 1, 1), Row.of(1, 3, 3),
Row.of(2, 4, 4));
+ }
+
@Test
public void testWriteRead() {
sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4),
(1, 5, 5)");