This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b0b89be6f [flink] Add local merge operator for Flink (#1845)
b0b89be6f is described below

commit b0b89be6f18ab4481d2d8302851f63e1321b562a
Author: tsreaper <[email protected]>
AuthorDate: Mon Aug 21 10:25:28 2023 +0800

    [flink] Add local merge operator for Flink (#1845)
---
 docs/content/maintenance/write-performance.md      |  12 ++
 .../shortcodes/generated/core_configuration.html   |   7 +
 .../generated/flink_connector_configuration.html   |  24 +--
 .../main/java/org/apache/paimon/CoreOptions.java   |  19 +++
 .../java/org/apache/paimon/schema/TableSchema.java |   4 +
 .../table/ChangelogWithKeyFileStoreTable.java      | 103 +-----------
 .../paimon/table/ChangelogWithKeyTableUtils.java   | 103 ++++++++++++
 .../paimon/table/sink/SequenceGenerator.java       |  33 +++-
 .../paimon/table/sink/SequenceGeneratorTest.java   |  27 +--
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  26 ++-
 .../paimon/flink/sink/LocalMergeOperator.java      | 186 +++++++++++++++++++++
 .../ChangelogWithKeyFileStoreTableITCase.java      |  62 ++++---
 12 files changed, 450 insertions(+), 156 deletions(-)

diff --git a/docs/content/maintenance/write-performance.md 
b/docs/content/maintenance/write-performance.md
index 27d8c7bbd..f25eae4ef 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -163,6 +163,18 @@ One can easily see that too many sorted runs will result 
in poor query performan
 
 Compaction will become less frequent when `num-sorted-run.compaction-trigger` 
becomes larger, thus improving writing performance. However, if this value 
becomes too large, more memory and CPU time will be needed when querying the 
table. This is a trade-off between writing and query performance.
 
+## Local Merging
+
+If your job suffers from primary key data skew
+(for example, you want to count the number of views for each pages in a 
website,
+and some particular pages are very popular among the users),
+you can set `'local-merge-buffer-size'` so that input records will be buffered 
and merged
+before they're shuffled by bucket and written into sink.
+This is particularly useful when the same primary key is updated frequently 
between snapshots.
+
+The buffer will be flushed when it is full. We recommend starting with `64 mb`
+when you are faced with data skew but don't know where to start adjusting 
buffer size.
+
 ## File Format
 
 If you want to achieve ultimate compaction performance, you can consider using 
row storage file format AVRO.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index a54885ef3..a1cd016a9 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -176,6 +176,13 @@ under the License.
             <td>String</td>
             <td>Read incremental changes between start timestamp (exclusive) 
and end timestamp, for example, 't1,t2' means changes between timestamp t1 and 
timestamp t2.</td>
         </tr>
+        <tr>
+            <td><h5>local-merge-buffer-size</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Local merge will buffer and merge input records before they're 
shuffled by bucket and written into sink. The buffer will be flushed when it is 
full.
+Mainly to resolve data skew on primary keys. We recommend starting with 64 mb 
when trying out this feature.</td>
+        </tr>
         <tr>
             <td><h5>local-sort.max-num-file-handles</h5></td>
             <td style="word-wrap: break-word;">128</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 7c887dab6..db571cf2f 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -38,18 +38,6 @@ under the License.
             <td>String</td>
             <td>The log system used to keep changes of the table.<br /><br 
/>Possible values:<br /><ul><li>"none": No log system, the data is written only 
to file store, and the streaming read will be directly read from the file 
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written 
to file store and kafka, and the streaming read will be read from kafka. If 
streaming read from file, configures streaming-read-mode to file.</li></ul></td>
         </tr>
-        <tr>
-            <td><h5>lookup.async</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to enable async lookup join.</td>
-        </tr>
-        <tr>
-            <td><h5>lookup.async-thread-number</h5></td>
-            <td style="word-wrap: break-word;">16</td>
-            <td>Integer</td>
-            <td>The thread number for lookup async.</td>
-        </tr>
         <tr>
             <td><h5>log.system.partitions</h5></td>
             <td style="word-wrap: break-word;">1</td>
@@ -62,6 +50,18 @@ under the License.
             <td>Integer</td>
             <td>The number of replication of the log system. If log system is 
kafka, this is kafka replicationFactor.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.async</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable async lookup join.</td>
+        </tr>
+        <tr>
+            <td><h5>lookup.async-thread-number</h5></td>
+            <td style="word-wrap: break-word;">16</td>
+            <td>Integer</td>
+            <td>The thread number for lookup async.</td>
+        </tr>
         <tr>
             <td><h5>scan.infer-parallelism</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9c516273a..2e5042c06 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -856,6 +856,17 @@ public class CoreOptions implements Serializable {
                                     + " the value should be the user 
configured local time zone. The option value is either a full name"
                                     + " such as 'America/Los_Angeles', or a 
custom timezone id such as 'GMT-08:00'.");
 
+    public static final ConfigOption<MemorySize> LOCAL_MERGE_BUFFER_SIZE =
+            key("local-merge-buffer-size")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Local merge will buffer and merge input records "
+                                    + "before they're shuffled by bucket and 
written into sink. "
+                                    + "The buffer will be flushed when it is 
full.\n"
+                                    + "Mainly to resolve data skew on primary 
keys. "
+                                    + "We recommend starting with 64 mb when 
trying out this feature.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1262,6 +1273,14 @@ public class CoreOptions implements Serializable {
         return options.getInteger(ORC_WRITE_BATCH_SIZE.key(), 
ORC_WRITE_BATCH_SIZE.defaultValue());
     }
 
+    public boolean localMergeEnabled() {
+        return options.get(LOCAL_MERGE_BUFFER_SIZE) != null;
+    }
+
+    public long localMergeBufferSize() {
+        return options.get(LOCAL_MERGE_BUFFER_SIZE).getBytes();
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index feec6eb50..fe2a7f3ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -229,6 +229,10 @@ public class TableSchema implements Serializable {
         return projectedLogicalRowType(trimmedPrimaryKeys());
     }
 
+    public RowType logicalPrimaryKeysType() {
+        return projectedLogicalRowType(primaryKeys());
+    }
+
     public List<DataField> trimmedPrimaryKeysFields() {
         return projectedDataFields(trimmedPrimaryKeys());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index a9b7fad2e..3639fc64e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.SequenceAutoPadding;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.WriteMode;
@@ -28,12 +27,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
-import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
 import org.apache.paimon.operation.Lock;
@@ -49,17 +44,14 @@ import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.MergeTreeSplitGenerator;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import java.util.List;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.predicate.PredicateBuilder.and;
 import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
 
 /** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with 
primary keys. */
 public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
@@ -91,36 +83,11 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
             RowType rowType = tableSchema.logicalRowType();
             Options conf = Options.fromMap(tableSchema.options());
             CoreOptions options = new CoreOptions(conf);
-            CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
-            KeyValueFieldsExtractor extractor = 
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
-
-            MergeFunctionFactory<KeyValue> mfFactory;
-
-            switch (mergeEngine) {
-                case DEDUPLICATE:
-                    mfFactory = DeduplicateMergeFunction.factory();
-                    break;
-                case PARTIAL_UPDATE:
-                    mfFactory = PartialUpdateMergeFunction.factory(conf, 
rowType);
-                    break;
-                case AGGREGATE:
-                    mfFactory =
-                            AggregateMergeFunction.factory(
-                                    conf,
-                                    tableSchema.fieldNames(),
-                                    rowType.getFieldTypes(),
-                                    tableSchema.primaryKeys());
-                    break;
-                case FIRST_ROW:
-                    mfFactory =
-                            FirstRowMergeFunction.factory(
-                                    new 
RowType(extractor.keyFields(tableSchema)), rowType);
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unsupported merge engine: " + mergeEngine);
-            }
+            KeyValueFieldsExtractor extractor =
+                    
ChangelogWithKeyTableUtils.ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
 
+            MergeFunctionFactory<KeyValue> mfFactory =
+                    
ChangelogWithKeyTableUtils.createMergeFunctionFactory(tableSchema);
             if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
                 mfFactory =
                         LookupMergeFunction.wrap(
@@ -135,7 +102,8 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                             tableSchema.crossPartitionUpdate(),
                             options,
                             tableSchema.logicalPartitionType(),
-                            
addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
+                            ChangelogWithKeyTableUtils.addKeyNamePrefix(
+                                    tableSchema.logicalBucketKeyType()),
                             new RowType(extractor.keyFields(tableSchema)),
                             rowType,
                             extractor,
@@ -144,32 +112,6 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
         return lazyStore;
     }
 
-    private static RowType addKeyNamePrefix(RowType type) {
-        // add prefix to avoid conflict with value
-        return new RowType(
-                type.getFields().stream()
-                        .map(
-                                f ->
-                                        new DataField(
-                                                f.id(),
-                                                KEY_FIELD_PREFIX + f.name(),
-                                                f.type(),
-                                                f.description()))
-                        .collect(Collectors.toList()));
-    }
-
-    private static List<DataField> addKeyNamePrefix(List<DataField> keyFields) 
{
-        return keyFields.stream()
-                .map(
-                        f ->
-                                new DataField(
-                                        f.id(),
-                                        KEY_FIELD_PREFIX + f.name(),
-                                        f.type(),
-                                        f.description()))
-                .collect(Collectors.toList());
-    }
-
     @Override
     public SplitGenerator splitGenerator() {
         return new MergeTreeSplitGenerator(
@@ -245,14 +187,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     public TableWriteImpl<KeyValue> newWrite(
             String commitUser, ManifestCacheFilter manifestFilter) {
         final SequenceGenerator sequenceGenerator =
-                store().options()
-                        .sequenceField()
-                        .map(field -> new SequenceGenerator(field, 
schema().logicalRowType()))
-                        .orElse(null);
-        final List<SequenceAutoPadding> sequenceAutoPadding =
-                store().options().sequenceAutoPadding().stream()
-                        .map(SequenceAutoPadding::fromString)
-                        .collect(Collectors.toList());
+                SequenceGenerator.create(schema(), store().options());
         final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store().newWrite(commitUser, manifestFilter),
@@ -261,10 +196,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                     long sequenceNumber =
                             sequenceGenerator == null
                                     ? KeyValue.UNKNOWN_SEQUENCE
-                                    : sequenceAutoPadding.isEmpty()
-                                            ? 
sequenceGenerator.generate(record.row())
-                                            : 
sequenceGenerator.generateWithPadding(
-                                                    record.row(), 
sequenceAutoPadding);
+                                    : sequenceGenerator.generate(record.row());
                     return kv.replace(
                             record.primaryKey(),
                             sequenceNumber,
@@ -272,23 +204,4 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                             record.row());
                 });
     }
-
-    static class ChangelogWithKeyKeyValueFieldsExtractor implements 
KeyValueFieldsExtractor {
-        private static final long serialVersionUID = 1L;
-
-        static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
-                new ChangelogWithKeyKeyValueFieldsExtractor();
-
-        private ChangelogWithKeyKeyValueFieldsExtractor() {}
-
-        @Override
-        public List<DataField> keyFields(TableSchema schema) {
-            return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
-        }
-
-        @Override
-        public List<DataField> valueFields(TableSchema schema) {
-            return schema.fields();
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
new file mode 100644
index 000000000..36770365f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
+import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.KeyValueFieldsExtractor;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
+
+/** Utils for creating changelog table with primary keys. */
+public class ChangelogWithKeyTableUtils {
+
+    public static RowType addKeyNamePrefix(RowType type) {
+        return new RowType(addKeyNamePrefix(type.getFields()));
+    }
+
+    public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
+        return keyFields.stream()
+                .map(
+                        f ->
+                                new DataField(
+                                        f.id(),
+                                        KEY_FIELD_PREFIX + f.name(),
+                                        f.type(),
+                                        f.description()))
+                .collect(Collectors.toList());
+    }
+
+    public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(
+            TableSchema tableSchema) {
+        RowType rowType = tableSchema.logicalRowType();
+        Options conf = Options.fromMap(tableSchema.options());
+        CoreOptions options = new CoreOptions(conf);
+        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+        KeyValueFieldsExtractor extractor = 
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+
+        switch (mergeEngine) {
+            case DEDUPLICATE:
+                return DeduplicateMergeFunction.factory();
+            case PARTIAL_UPDATE:
+                return PartialUpdateMergeFunction.factory(conf, rowType);
+            case AGGREGATE:
+                return AggregateMergeFunction.factory(
+                        conf,
+                        tableSchema.fieldNames(),
+                        rowType.getFieldTypes(),
+                        tableSchema.primaryKeys());
+            case FIRST_ROW:
+                return FirstRowMergeFunction.factory(
+                        new RowType(extractor.keyFields(tableSchema)), 
rowType);
+            default:
+                throw new UnsupportedOperationException("Unsupported merge 
engine: " + mergeEngine);
+        }
+    }
+
+    static class ChangelogWithKeyKeyValueFieldsExtractor implements 
KeyValueFieldsExtractor {
+        private static final long serialVersionUID = 1L;
+
+        static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
+                new ChangelogWithKeyKeyValueFieldsExtractor();
+
+        private ChangelogWithKeyKeyValueFieldsExtractor() {}
+
+        @Override
+        public List<DataField> keyFields(TableSchema schema) {
+            return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
+        }
+
+        @Override
+        public List<DataField> valueFields(TableSchema schema) {
+            return schema.fields();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index f39f62c05..293c9df54 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -18,8 +18,10 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.SequenceAutoPadding;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.CharType;
 import org.apache.paimon.types.DataType;
@@ -41,18 +43,28 @@ import org.apache.paimon.utils.InternalRowUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /** Generate sequence number. */
 public class SequenceGenerator {
+
     private final int index;
+    private final List<SequenceAutoPadding> paddings;
 
     private final Generator generator;
     private final DataType fieldType;
 
     public SequenceGenerator(String field, RowType rowType) {
+        this(field, rowType, Collections.emptyList());
+    }
+
+    public SequenceGenerator(String field, RowType rowType, 
List<SequenceAutoPadding> paddings) {
         index = rowType.getFieldNames().indexOf(field);
+        this.paddings = paddings;
+
         if (index == -1) {
             throw new RuntimeException(
                     String.format(
@@ -64,6 +76,8 @@ public class SequenceGenerator {
 
     public SequenceGenerator(int index, DataType dataType) {
         this.index = index;
+        this.paddings = Collections.emptyList();
+
         this.fieldType = dataType;
         if (index == -1) {
             throw new RuntimeException(String.format("Index : %s is invalid", 
index));
@@ -71,6 +85,19 @@ public class SequenceGenerator {
         generator = fieldType.accept(new SequenceGeneratorVisitor());
     }
 
+    public static SequenceGenerator create(TableSchema schema, CoreOptions 
options) {
+        List<SequenceAutoPadding> sequenceAutoPadding =
+                options.sequenceAutoPadding().stream()
+                        .map(SequenceAutoPadding::fromString)
+                        .collect(Collectors.toList());
+        return options.sequenceField()
+                .map(
+                        field ->
+                                new SequenceGenerator(
+                                        field, schema.logicalRowType(), 
sequenceAutoPadding))
+                .orElse(null);
+    }
+
     public int index() {
         return index;
     }
@@ -85,11 +112,7 @@ public class SequenceGenerator {
     }
 
     public long generate(InternalRow row) {
-        return generator.generate(row, index);
-    }
-
-    public long generateWithPadding(InternalRow row, List<SequenceAutoPadding> 
paddings) {
-        long sequence = generate(row);
+        long sequence = generator.generate(row, index);
         for (SequenceAutoPadding padding : paddings) {
             switch (padding) {
                 case ROW_KIND_FLAG:
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
index d656d1e18..b8a5795dd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.Test;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
@@ -237,7 +239,12 @@ public class SequenceGeneratorTest {
     }
 
     private SequenceGenerator getGenerator(String field) {
-        return new SequenceGenerator(field, ALL_DATA_TYPE);
+        return getGenerator(field, Collections.emptyList());
+    }
+
+    private SequenceGenerator getGenerator(
+            String field, List<CoreOptions.SequenceAutoPadding> paddings) {
+        return new SequenceGenerator(field, ALL_DATA_TYPE, paddings);
     }
 
     private void assertUnsupportedDatatype(String field) {
@@ -246,8 +253,7 @@ public class SequenceGeneratorTest {
     }
 
     private long generateWithPaddingOnSecond(String field) {
-        return getGenerator(field)
-                .generateWithPadding(row, 
Collections.singletonList(SECOND_TO_MICRO));
+        return getGenerator(field, 
Collections.singletonList(SECOND_TO_MICRO)).generate(row);
     }
 
     private long getSecondFromGeneratedWithPadding(long generated) {
@@ -255,22 +261,17 @@ public class SequenceGeneratorTest {
     }
 
     private long generateWithPaddingOnMillis(String field) {
-        return getGenerator(field)
-                .generateWithPadding(row, 
Collections.singletonList(MILLIS_TO_MICRO));
+        return getGenerator(field, 
Collections.singletonList(MILLIS_TO_MICRO)).generate(row);
     }
 
     private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
-        return getGenerator("_bigint")
-                .generateWithPadding(
-                        GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
-                        Collections.singletonList(ROW_KIND_FLAG));
+        return getGenerator("_bigint", 
Collections.singletonList(ROW_KIND_FLAG))
+                .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, 
sequence));
     }
 
     private long generateWithPaddingOnMicrosAndRowKind(long sequence, RowKind 
rowKind) {
-        return getGenerator("_bigint")
-                .generateWithPadding(
-                        GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
-                        Arrays.asList(MILLIS_TO_MICRO, ROW_KIND_FLAG));
+        return getGenerator("_bigint", Arrays.asList(MILLIS_TO_MICRO, 
ROW_KIND_FLAG))
+                .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, 
sequence));
     }
 
     private long getMillisFromGeneratedWithPadding(long generated) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 2c23dfdf9..4102d5ca1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -69,29 +69,41 @@ public class FlinkSinkBuilder {
     }
 
     public DataStreamSink<?> build() {
+        DataStream<RowData> input = this.input;
+        if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
+            input =
+                    input.forward()
+                            .transform(
+                                    "local merge",
+                                    input.getType(),
+                                    new LocalMergeOperator(table.schema()))
+                            .setParallelism(input.getParallelism());
+        }
+
         BucketMode bucketMode = table.bucketMode();
         switch (bucketMode) {
             case FIXED:
-                return buildForFixedBucket();
+                return buildForFixedBucket(input);
             case DYNAMIC:
-                return buildDynamicBucketSink(false);
+                return buildDynamicBucketSink(input, false);
             case GLOBAL_DYNAMIC:
-                return buildDynamicBucketSink(true);
+                return buildDynamicBucketSink(input, true);
             case UNAWARE:
-                return buildUnawareBucketSink();
+                return buildUnawareBucketSink(input);
             default:
                 throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
         }
     }
 
-    private DataStreamSink<?> buildDynamicBucketSink(boolean globalIndex) {
+    private DataStreamSink<?> buildDynamicBucketSink(
+            DataStream<RowData> input, boolean globalIndex) {
         checkArgument(logSinkFunction == null, "Dynamic bucket mode can not 
work with log system.");
         return globalIndex
                 ? new GlobalDynamicBucketSink(table, 
overwritePartition).build(input, parallelism)
                 : new RowDynamicBucketSink(table, 
overwritePartition).build(input, parallelism);
     }
 
-    private DataStreamSink<?> buildForFixedBucket() {
+    private DataStreamSink<?> buildForFixedBucket(DataStream<RowData> input) {
         DataStream<RowData> partitioned =
                 partition(
                         input,
@@ -101,7 +113,7 @@ public class FlinkSinkBuilder {
         return sink.sinkFrom(partitioned);
     }
 
-    private DataStreamSink<?> buildUnawareBucketSink() {
+    private DataStreamSink<?> buildUnawareBucketSink(DataStream<RowData> 
input) {
         checkArgument(
                 table instanceof AppendOnlyFileStoreTable,
                 "Unaware bucket mode only works with append-only table for 
now.");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
new file mode 100644
index 000000000..52581a1b5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.mergetree.SortBufferWriteBuffer;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.ChangelogWithKeyTableUtils;
+import org.apache.paimon.table.sink.SequenceGenerator;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyComparatorSupplier;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link AbstractStreamOperator} which buffer input record and apply merge 
function when the buffer
+ * is full. Mainly to resolve data skew on primary keys.
+ */
+public class LocalMergeOperator extends AbstractStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+
+    private static final long serialVersionUID = 1L;
+
+    TableSchema schema;
+
+    private transient Projection keyProjection;
+    private transient RecordComparator keyComparator;
+
+    private transient long recordCount;
+    private transient SequenceGenerator sequenceGenerator;
+    private transient MergeFunction<KeyValue> mergeFunction;
+
+    private transient SortBufferWriteBuffer buffer;
+    private transient long currentWatermark;
+
+    private transient FlinkRowData reusedRowData;
+    private transient boolean endOfInput;
+
+    public LocalMergeOperator(TableSchema schema) {
+        Preconditions.checkArgument(
+                schema.primaryKeys().size() > 0,
+                "LocalMergeOperator currently only support tables with primary 
keys");
+        this.schema = schema;
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        RowType keyType =
+                
ChangelogWithKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+        RowType valueType = schema.logicalRowType();
+        CoreOptions options = new CoreOptions(schema.options());
+
+        keyProjection =
+                CodeGenUtils.newProjection(
+                        schema.logicalRowType(), 
schema.projection(schema.primaryKeys()));
+        keyComparator = new KeyComparatorSupplier(keyType).get();
+
+        recordCount = 0;
+        sequenceGenerator = SequenceGenerator.create(schema, options);
+        mergeFunction = 
ChangelogWithKeyTableUtils.createMergeFunctionFactory(schema).create();
+
+        buffer =
+                new SortBufferWriteBuffer(
+                        keyType,
+                        valueType,
+                        new HeapMemorySegmentPool(
+                                options.localMergeBufferSize(), 
options.pageSize()),
+                        false,
+                        options.localSortMaxNumFileHandles(),
+                        null);
+        currentWatermark = Long.MIN_VALUE;
+
+        reusedRowData = new FlinkRowData(null);
+        endOfInput = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> record) throws Exception {
+        recordCount++;
+        InternalRow row = new FlinkRowWrapper(record.getValue());
+
+        RowKind rowKind = row.getRowKind();
+        // row kind must be INSERT when it is divided into key and value
+        row.setRowKind(RowKind.INSERT);
+
+        InternalRow key = keyProjection.apply(row);
+        long sequenceNumber =
+                sequenceGenerator == null ? recordCount : 
sequenceGenerator.generate(row);
+        if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+            flushBuffer();
+            if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+                // change row kind back
+                row.setRowKind(rowKind);
+                output.collect(record);
+            }
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        // don't emit watermark immediately, emit them after flushing buffer
+        currentWatermark = mark.getTimestamp();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        if (!endOfInput) {
+            flushBuffer();
+        }
+        // no records are expected to emit after endOfInput
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        endOfInput = true;
+        flushBuffer();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (buffer != null) {
+            buffer.clear();
+        }
+
+        super.close();
+    }
+
+    private void flushBuffer() throws Exception {
+        if (buffer.size() == 0) {
+            return;
+        }
+
+        buffer.forEach(
+                keyComparator,
+                mergeFunction,
+                null,
+                kv -> {
+                    InternalRow row = kv.value();
+                    row.setRowKind(kv.valueKind());
+                    output.collect(new 
StreamRecord<>(reusedRowData.replace(row)));
+                });
+        buffer.clear();
+
+        if (currentWatermark != Long.MIN_VALUE) {
+            super.processWatermark(new Watermark(currentWatermark));
+            // each watermark should only be emitted once
+            currentWatermark = Long.MIN_VALUE;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
index 924a71e7c..c2898856a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.utils.FailingFileIO;
 
 import org.apache.flink.api.common.JobStatus;
@@ -49,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -60,10 +63,17 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
     //  Test Utilities
     // ------------------------------------------------------------------------
     private String path;
+    private Map<String, String> tableDefaultProperties;
 
     @BeforeEach
     public void before() throws IOException {
         path = getTempDirPath();
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        tableDefaultProperties = new HashMap<>();
+        if (random.nextBoolean()) {
+            
tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "256 kb");
+        }
     }
 
     private TableEnvironment createBatchTableEnvironment() {
@@ -88,6 +98,25 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         return env;
     }
 
+    private String createCatalogSql(String catalogName, String warehouse) {
+        String defaultPropertyString = "";
+        if (tableDefaultProperties.size() > 0) {
+            defaultPropertyString = ", ";
+            defaultPropertyString +=
+                    tableDefaultProperties.entrySet().stream()
+                            .map(
+                                    e ->
+                                            String.format(
+                                                    "'table-default.%s' = 
'%s'",
+                                                    e.getKey(), e.getValue()))
+                            .collect(Collectors.joining(", "));
+        }
+
+        return String.format(
+                "CREATE CATALOG `%s` WITH ( 'type' = 'paimon', 'warehouse' = 
'%s' %s )",
+                catalogName, warehouse, defaultPropertyString);
+    }
+
     // ------------------------------------------------------------------------
     //  Constructed Tests
     // ------------------------------------------------------------------------
@@ -109,10 +138,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         bEnv.getConfig()
                 .getConfiguration()
                 
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-        bEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG testCatalog WITH ('type'='paimon', 
'warehouse'='%s')",
-                        path));
+        bEnv.executeSql(createCatalogSql("testCatalog", path));
         bEnv.executeSql("USE CATALOG testCatalog");
         bEnv.executeSql(
                 "CREATE TABLE T ("
@@ -130,10 +156,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         sEnv.getConfig()
                 .getConfiguration()
                 
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-        sEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG testCatalog WITH ('type'='paimon', 
'warehouse'='%s')",
-                        path));
+        sEnv.executeSql(createCatalogSql("testCatalog", path));
         sEnv.executeSql("USE CATALOG testCatalog");
         CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T").collect();
 
@@ -177,10 +200,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
                 .getConfiguration()
                 
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
-        sEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG testCatalog WITH ('type'='paimon', 
'warehouse'='%s/warehouse')",
-                        path));
+        sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
         sEnv.executeSql("USE CATALOG testCatalog");
         sEnv.executeSql(
                 "CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT 
ENFORCED ) "
@@ -190,6 +210,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + ")");
 
         Path inputPath = new Path(path, "input");
+        LocalFileIO.create().mkdirs(inputPath);
         sEnv.executeSql(
                 "CREATE TABLE `default_catalog`.`default_database`.`S` ( i 
INT, g STRING ) "
                         + "WITH ( 'connector' = 'filesystem', 'format' = 
'testcsv', 'path' = '"
@@ -237,6 +258,8 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
                         "-U[4, D]",
                         "+U[4, C]",
                         "+I[5, D]");
+
+        it.close();
     }
 
     // ------------------------------------------------------------------------
@@ -435,10 +458,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
         sEnv.getConfig()
                 .getConfiguration()
                 
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-        sEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG testCatalog WITH ('type'='paimon', 
'warehouse'='%s')",
-                        path));
+        sEnv.executeSql(createCatalogSql("testCatalog", path));
         sEnv.executeSql("USE CATALOG testCatalog");
 
         ResultChecker checker = new ResultChecker();
@@ -476,10 +496,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
 
         // no failure when creating catalog and table
         FailingFileIO.reset(failingName, 0, 1);
-        tEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG testCatalog WITH ('type'='paimon', 
'warehouse'='%s')",
-                        failingPath));
+        tEnv.executeSql(createCatalogSql("testCatalog", failingPath));
         tEnv.executeSql("USE CATALOG testCatalog");
         tEnv.executeSql(
                 "CREATE TABLE T("
@@ -556,10 +573,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends 
AbstractTestBase {
 
     private void checkBatchResult(int numProducers) throws Exception {
         TableEnvironment bEnv = createBatchTableEnvironment();
-        bEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG testCatalog WITH ('type'='paimon', 
'warehouse'='%s')",
-                        path));
+        bEnv.executeSql(createCatalogSql("testCatalog", path));
         bEnv.executeSql("USE CATALOG testCatalog");
 
         ResultChecker checker = new ResultChecker();


Reply via email to