This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b0b89be6f [flink] Add local merge operator for Flink (#1845)
b0b89be6f is described below
commit b0b89be6f18ab4481d2d8302851f63e1321b562a
Author: tsreaper <[email protected]>
AuthorDate: Mon Aug 21 10:25:28 2023 +0800
[flink] Add local merge operator for Flink (#1845)
---
docs/content/maintenance/write-performance.md | 12 ++
.../shortcodes/generated/core_configuration.html | 7 +
.../generated/flink_connector_configuration.html | 24 +--
.../main/java/org/apache/paimon/CoreOptions.java | 19 +++
.../java/org/apache/paimon/schema/TableSchema.java | 4 +
.../table/ChangelogWithKeyFileStoreTable.java | 103 +-----------
.../paimon/table/ChangelogWithKeyTableUtils.java | 103 ++++++++++++
.../paimon/table/sink/SequenceGenerator.java | 33 +++-
.../paimon/table/sink/SequenceGeneratorTest.java | 27 +--
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 26 ++-
.../paimon/flink/sink/LocalMergeOperator.java | 186 +++++++++++++++++++++
.../ChangelogWithKeyFileStoreTableITCase.java | 62 ++++---
12 files changed, 450 insertions(+), 156 deletions(-)
diff --git a/docs/content/maintenance/write-performance.md
b/docs/content/maintenance/write-performance.md
index 27d8c7bbd..f25eae4ef 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -163,6 +163,18 @@ One can easily see that too many sorted runs will result
in poor query performan
Compaction will become less frequent when `num-sorted-run.compaction-trigger`
becomes larger, thus improving writing performance. However, if this value
becomes too large, more memory and CPU time will be needed when querying the
table. This is a trade-off between writing and query performance.
+## Local Merging
+
+If your job suffers from primary key data skew
+(for example, you want to count the number of views for each pages in a
website,
+and some particular pages are very popular among the users),
+you can set `'local-merge-buffer-size'` so that input records will be buffered
and merged
+before they're shuffled by bucket and written into sink.
+This is particularly useful when the same primary key is updated frequently
between snapshots.
+
+The buffer will be flushed when it is full. We recommend starting with `64 mb`
+when you are faced with data skew but don't know where to start adjusting
buffer size.
+
## File Format
If you want to achieve ultimate compaction performance, you can consider using
row storage file format AVRO.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index a54885ef3..a1cd016a9 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -176,6 +176,13 @@ under the License.
<td>String</td>
<td>Read incremental changes between start timestamp (exclusive)
and end timestamp, for example, 't1,t2' means changes between timestamp t1 and
timestamp t2.</td>
</tr>
+ <tr>
+ <td><h5>local-merge-buffer-size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Local merge will buffer and merge input records before they're
shuffled by bucket and written into sink. The buffer will be flushed when it is
full.
+Mainly to resolve data skew on primary keys. We recommend starting with 64 mb
when trying out this feature.</td>
+ </tr>
<tr>
<td><h5>local-sort.max-num-file-handles</h5></td>
<td style="word-wrap: break-word;">128</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 7c887dab6..db571cf2f 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -38,18 +38,6 @@ under the License.
<td>String</td>
<td>The log system used to keep changes of the table.<br /><br
/>Possible values:<br /><ul><li>"none": No log system, the data is written only
to file store, and the streaming read will be directly read from the file
store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written
to file store and kafka, and the streaming read will be read from kafka. If
streaming read from file, configures streaming-read-mode to file.</li></ul></td>
</tr>
- <tr>
- <td><h5>lookup.async</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to enable async lookup join.</td>
- </tr>
- <tr>
- <td><h5>lookup.async-thread-number</h5></td>
- <td style="word-wrap: break-word;">16</td>
- <td>Integer</td>
- <td>The thread number for lookup async.</td>
- </tr>
<tr>
<td><h5>log.system.partitions</h5></td>
<td style="word-wrap: break-word;">1</td>
@@ -62,6 +50,18 @@ under the License.
<td>Integer</td>
<td>The number of replication of the log system. If log system is
kafka, this is kafka replicationFactor.</td>
</tr>
+ <tr>
+ <td><h5>lookup.async</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable async lookup join.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.async-thread-number</h5></td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>The thread number for lookup async.</td>
+ </tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9c516273a..2e5042c06 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -856,6 +856,17 @@ public class CoreOptions implements Serializable {
+ " the value should be the user
configured local time zone. The option value is either a full name"
+ " such as 'America/Los_Angeles', or a
custom timezone id such as 'GMT-08:00'.");
+ public static final ConfigOption<MemorySize> LOCAL_MERGE_BUFFER_SIZE =
+ key("local-merge-buffer-size")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ "Local merge will buffer and merge input records "
+ + "before they're shuffled by bucket and
written into sink. "
+ + "The buffer will be flushed when it is
full.\n"
+ + "Mainly to resolve data skew on primary
keys. "
+ + "We recommend starting with 64 mb when
trying out this feature.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1262,6 +1273,14 @@ public class CoreOptions implements Serializable {
return options.getInteger(ORC_WRITE_BATCH_SIZE.key(),
ORC_WRITE_BATCH_SIZE.defaultValue());
}
+ public boolean localMergeEnabled() {
+ return options.get(LOCAL_MERGE_BUFFER_SIZE) != null;
+ }
+
+ public long localMergeBufferSize() {
+ return options.get(LOCAL_MERGE_BUFFER_SIZE).getBytes();
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index feec6eb50..fe2a7f3ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -229,6 +229,10 @@ public class TableSchema implements Serializable {
return projectedLogicalRowType(trimmedPrimaryKeys());
}
+ public RowType logicalPrimaryKeysType() {
+ return projectedLogicalRowType(primaryKeys());
+ }
+
public List<DataField> trimmedPrimaryKeysFields() {
return projectedDataFields(trimmedPrimaryKeys());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index a9b7fad2e..3639fc64e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.SequenceAutoPadding;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.WriteMode;
@@ -28,12 +27,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
-import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.Lock;
@@ -49,17 +44,14 @@ import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import java.util.List;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with
primary keys. */
public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
@@ -91,36 +83,11 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
RowType rowType = tableSchema.logicalRowType();
Options conf = Options.fromMap(tableSchema.options());
CoreOptions options = new CoreOptions(conf);
- CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
- KeyValueFieldsExtractor extractor =
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
-
- MergeFunctionFactory<KeyValue> mfFactory;
-
- switch (mergeEngine) {
- case DEDUPLICATE:
- mfFactory = DeduplicateMergeFunction.factory();
- break;
- case PARTIAL_UPDATE:
- mfFactory = PartialUpdateMergeFunction.factory(conf,
rowType);
- break;
- case AGGREGATE:
- mfFactory =
- AggregateMergeFunction.factory(
- conf,
- tableSchema.fieldNames(),
- rowType.getFieldTypes(),
- tableSchema.primaryKeys());
- break;
- case FIRST_ROW:
- mfFactory =
- FirstRowMergeFunction.factory(
- new
RowType(extractor.keyFields(tableSchema)), rowType);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported merge engine: " + mergeEngine);
- }
+ KeyValueFieldsExtractor extractor =
+
ChangelogWithKeyTableUtils.ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+ MergeFunctionFactory<KeyValue> mfFactory =
+
ChangelogWithKeyTableUtils.createMergeFunctionFactory(tableSchema);
if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
mfFactory =
LookupMergeFunction.wrap(
@@ -135,7 +102,8 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
tableSchema.crossPartitionUpdate(),
options,
tableSchema.logicalPartitionType(),
-
addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
+ ChangelogWithKeyTableUtils.addKeyNamePrefix(
+ tableSchema.logicalBucketKeyType()),
new RowType(extractor.keyFields(tableSchema)),
rowType,
extractor,
@@ -144,32 +112,6 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
return lazyStore;
}
- private static RowType addKeyNamePrefix(RowType type) {
- // add prefix to avoid conflict with value
- return new RowType(
- type.getFields().stream()
- .map(
- f ->
- new DataField(
- f.id(),
- KEY_FIELD_PREFIX + f.name(),
- f.type(),
- f.description()))
- .collect(Collectors.toList()));
- }
-
- private static List<DataField> addKeyNamePrefix(List<DataField> keyFields)
{
- return keyFields.stream()
- .map(
- f ->
- new DataField(
- f.id(),
- KEY_FIELD_PREFIX + f.name(),
- f.type(),
- f.description()))
- .collect(Collectors.toList());
- }
-
@Override
public SplitGenerator splitGenerator() {
return new MergeTreeSplitGenerator(
@@ -245,14 +187,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
public TableWriteImpl<KeyValue> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
final SequenceGenerator sequenceGenerator =
- store().options()
- .sequenceField()
- .map(field -> new SequenceGenerator(field,
schema().logicalRowType()))
- .orElse(null);
- final List<SequenceAutoPadding> sequenceAutoPadding =
- store().options().sequenceAutoPadding().stream()
- .map(SequenceAutoPadding::fromString)
- .collect(Collectors.toList());
+ SequenceGenerator.create(schema(), store().options());
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
@@ -261,10 +196,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
long sequenceNumber =
sequenceGenerator == null
? KeyValue.UNKNOWN_SEQUENCE
- : sequenceAutoPadding.isEmpty()
- ?
sequenceGenerator.generate(record.row())
- :
sequenceGenerator.generateWithPadding(
- record.row(),
sequenceAutoPadding);
+ : sequenceGenerator.generate(record.row());
return kv.replace(
record.primaryKey(),
sequenceNumber,
@@ -272,23 +204,4 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
record.row());
});
}
-
- static class ChangelogWithKeyKeyValueFieldsExtractor implements
KeyValueFieldsExtractor {
- private static final long serialVersionUID = 1L;
-
- static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
- new ChangelogWithKeyKeyValueFieldsExtractor();
-
- private ChangelogWithKeyKeyValueFieldsExtractor() {}
-
- @Override
- public List<DataField> keyFields(TableSchema schema) {
- return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
- }
-
- @Override
- public List<DataField> valueFields(TableSchema schema) {
- return schema.fields();
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
new file mode 100644
index 000000000..36770365f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
+import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.KeyValueFieldsExtractor;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
+
+/** Utils for creating changelog table with primary keys. */
+public class ChangelogWithKeyTableUtils {
+
+ public static RowType addKeyNamePrefix(RowType type) {
+ return new RowType(addKeyNamePrefix(type.getFields()));
+ }
+
+ public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
+ return keyFields.stream()
+ .map(
+ f ->
+ new DataField(
+ f.id(),
+ KEY_FIELD_PREFIX + f.name(),
+ f.type(),
+ f.description()))
+ .collect(Collectors.toList());
+ }
+
+ public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(
+ TableSchema tableSchema) {
+ RowType rowType = tableSchema.logicalRowType();
+ Options conf = Options.fromMap(tableSchema.options());
+ CoreOptions options = new CoreOptions(conf);
+ CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+ KeyValueFieldsExtractor extractor =
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+
+ switch (mergeEngine) {
+ case DEDUPLICATE:
+ return DeduplicateMergeFunction.factory();
+ case PARTIAL_UPDATE:
+ return PartialUpdateMergeFunction.factory(conf, rowType);
+ case AGGREGATE:
+ return AggregateMergeFunction.factory(
+ conf,
+ tableSchema.fieldNames(),
+ rowType.getFieldTypes(),
+ tableSchema.primaryKeys());
+ case FIRST_ROW:
+ return FirstRowMergeFunction.factory(
+ new RowType(extractor.keyFields(tableSchema)),
rowType);
+ default:
+ throw new UnsupportedOperationException("Unsupported merge
engine: " + mergeEngine);
+ }
+ }
+
+ static class ChangelogWithKeyKeyValueFieldsExtractor implements
KeyValueFieldsExtractor {
+ private static final long serialVersionUID = 1L;
+
+ static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
+ new ChangelogWithKeyKeyValueFieldsExtractor();
+
+ private ChangelogWithKeyKeyValueFieldsExtractor() {}
+
+ @Override
+ public List<DataField> keyFields(TableSchema schema) {
+ return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
+ }
+
+ @Override
+ public List<DataField> valueFields(TableSchema schema) {
+ return schema.fields();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index f39f62c05..293c9df54 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -18,8 +18,10 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.SequenceAutoPadding;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataType;
@@ -41,18 +43,28 @@ import org.apache.paimon.utils.InternalRowUtils;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/** Generate sequence number. */
public class SequenceGenerator {
+
private final int index;
+ private final List<SequenceAutoPadding> paddings;
private final Generator generator;
private final DataType fieldType;
public SequenceGenerator(String field, RowType rowType) {
+ this(field, rowType, Collections.emptyList());
+ }
+
+ public SequenceGenerator(String field, RowType rowType,
List<SequenceAutoPadding> paddings) {
index = rowType.getFieldNames().indexOf(field);
+ this.paddings = paddings;
+
if (index == -1) {
throw new RuntimeException(
String.format(
@@ -64,6 +76,8 @@ public class SequenceGenerator {
public SequenceGenerator(int index, DataType dataType) {
this.index = index;
+ this.paddings = Collections.emptyList();
+
this.fieldType = dataType;
if (index == -1) {
throw new RuntimeException(String.format("Index : %s is invalid",
index));
@@ -71,6 +85,19 @@ public class SequenceGenerator {
generator = fieldType.accept(new SequenceGeneratorVisitor());
}
+ public static SequenceGenerator create(TableSchema schema, CoreOptions
options) {
+ List<SequenceAutoPadding> sequenceAutoPadding =
+ options.sequenceAutoPadding().stream()
+ .map(SequenceAutoPadding::fromString)
+ .collect(Collectors.toList());
+ return options.sequenceField()
+ .map(
+ field ->
+ new SequenceGenerator(
+ field, schema.logicalRowType(),
sequenceAutoPadding))
+ .orElse(null);
+ }
+
public int index() {
return index;
}
@@ -85,11 +112,7 @@ public class SequenceGenerator {
}
public long generate(InternalRow row) {
- return generator.generate(row, index);
- }
-
- public long generateWithPadding(InternalRow row, List<SequenceAutoPadding>
paddings) {
- long sequence = generate(row);
+ long sequence = generator.generate(row, index);
for (SequenceAutoPadding padding : paddings) {
switch (padding) {
case ROW_KIND_FLAG:
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
index d656d1e18..b8a5795dd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
@@ -237,7 +239,12 @@ public class SequenceGeneratorTest {
}
private SequenceGenerator getGenerator(String field) {
- return new SequenceGenerator(field, ALL_DATA_TYPE);
+ return getGenerator(field, Collections.emptyList());
+ }
+
+ private SequenceGenerator getGenerator(
+ String field, List<CoreOptions.SequenceAutoPadding> paddings) {
+ return new SequenceGenerator(field, ALL_DATA_TYPE, paddings);
}
private void assertUnsupportedDatatype(String field) {
@@ -246,8 +253,7 @@ public class SequenceGeneratorTest {
}
private long generateWithPaddingOnSecond(String field) {
- return getGenerator(field)
- .generateWithPadding(row,
Collections.singletonList(SECOND_TO_MICRO));
+ return getGenerator(field,
Collections.singletonList(SECOND_TO_MICRO)).generate(row);
}
private long getSecondFromGeneratedWithPadding(long generated) {
@@ -255,22 +261,17 @@ public class SequenceGeneratorTest {
}
private long generateWithPaddingOnMillis(String field) {
- return getGenerator(field)
- .generateWithPadding(row,
Collections.singletonList(MILLIS_TO_MICRO));
+ return getGenerator(field,
Collections.singletonList(MILLIS_TO_MICRO)).generate(row);
}
private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
- return getGenerator("_bigint")
- .generateWithPadding(
- GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
- Collections.singletonList(ROW_KIND_FLAG));
+ return getGenerator("_bigint",
Collections.singletonList(ROW_KIND_FLAG))
+ .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0,
sequence));
}
private long generateWithPaddingOnMicrosAndRowKind(long sequence, RowKind
rowKind) {
- return getGenerator("_bigint")
- .generateWithPadding(
- GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
- Arrays.asList(MILLIS_TO_MICRO, ROW_KIND_FLAG));
+ return getGenerator("_bigint", Arrays.asList(MILLIS_TO_MICRO,
ROW_KIND_FLAG))
+ .generate(GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0,
sequence));
}
private long getMillisFromGeneratedWithPadding(long generated) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 2c23dfdf9..4102d5ca1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -69,29 +69,41 @@ public class FlinkSinkBuilder {
}
public DataStreamSink<?> build() {
+ DataStream<RowData> input = this.input;
+ if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
+ input =
+ input.forward()
+ .transform(
+ "local merge",
+ input.getType(),
+ new LocalMergeOperator(table.schema()))
+ .setParallelism(input.getParallelism());
+ }
+
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
case FIXED:
- return buildForFixedBucket();
+ return buildForFixedBucket(input);
case DYNAMIC:
- return buildDynamicBucketSink(false);
+ return buildDynamicBucketSink(input, false);
case GLOBAL_DYNAMIC:
- return buildDynamicBucketSink(true);
+ return buildDynamicBucketSink(input, true);
case UNAWARE:
- return buildUnawareBucketSink();
+ return buildUnawareBucketSink(input);
default:
throw new UnsupportedOperationException("Unsupported bucket
mode: " + bucketMode);
}
}
- private DataStreamSink<?> buildDynamicBucketSink(boolean globalIndex) {
+ private DataStreamSink<?> buildDynamicBucketSink(
+ DataStream<RowData> input, boolean globalIndex) {
checkArgument(logSinkFunction == null, "Dynamic bucket mode can not
work with log system.");
return globalIndex
? new GlobalDynamicBucketSink(table,
overwritePartition).build(input, parallelism)
: new RowDynamicBucketSink(table,
overwritePartition).build(input, parallelism);
}
- private DataStreamSink<?> buildForFixedBucket() {
+ private DataStreamSink<?> buildForFixedBucket(DataStream<RowData> input) {
DataStream<RowData> partitioned =
partition(
input,
@@ -101,7 +113,7 @@ public class FlinkSinkBuilder {
return sink.sinkFrom(partitioned);
}
- private DataStreamSink<?> buildUnawareBucketSink() {
+ private DataStreamSink<?> buildUnawareBucketSink(DataStream<RowData>
input) {
checkArgument(
table instanceof AppendOnlyFileStoreTable,
"Unaware bucket mode only works with append-only table for
now.");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
new file mode 100644
index 000000000..52581a1b5
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.mergetree.SortBufferWriteBuffer;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.ChangelogWithKeyTableUtils;
+import org.apache.paimon.table.sink.SequenceGenerator;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyComparatorSupplier;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link AbstractStreamOperator} which buffer input record and apply merge
function when the buffer
+ * is full. Mainly to resolve data skew on primary keys.
+ */
+public class LocalMergeOperator extends AbstractStreamOperator<RowData>
+ implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ TableSchema schema;
+
+ private transient Projection keyProjection;
+ private transient RecordComparator keyComparator;
+
+ private transient long recordCount;
+ private transient SequenceGenerator sequenceGenerator;
+ private transient MergeFunction<KeyValue> mergeFunction;
+
+ private transient SortBufferWriteBuffer buffer;
+ private transient long currentWatermark;
+
+ private transient FlinkRowData reusedRowData;
+ private transient boolean endOfInput;
+
+ public LocalMergeOperator(TableSchema schema) {
+ Preconditions.checkArgument(
+ schema.primaryKeys().size() > 0,
+ "LocalMergeOperator currently only support tables with primary
keys");
+ this.schema = schema;
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ RowType keyType =
+
ChangelogWithKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+ RowType valueType = schema.logicalRowType();
+ CoreOptions options = new CoreOptions(schema.options());
+
+ keyProjection =
+ CodeGenUtils.newProjection(
+ schema.logicalRowType(),
schema.projection(schema.primaryKeys()));
+ keyComparator = new KeyComparatorSupplier(keyType).get();
+
+ recordCount = 0;
+ sequenceGenerator = SequenceGenerator.create(schema, options);
+ mergeFunction =
ChangelogWithKeyTableUtils.createMergeFunctionFactory(schema).create();
+
+ buffer =
+ new SortBufferWriteBuffer(
+ keyType,
+ valueType,
+ new HeapMemorySegmentPool(
+ options.localMergeBufferSize(),
options.pageSize()),
+ false,
+ options.localSortMaxNumFileHandles(),
+ null);
+ currentWatermark = Long.MIN_VALUE;
+
+ reusedRowData = new FlinkRowData(null);
+ endOfInput = false;
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> record) throws Exception {
+ recordCount++;
+ InternalRow row = new FlinkRowWrapper(record.getValue());
+
+ RowKind rowKind = row.getRowKind();
+ // row kind must be INSERT when it is divided into key and value
+ row.setRowKind(RowKind.INSERT);
+
+ InternalRow key = keyProjection.apply(row);
+ long sequenceNumber =
+ sequenceGenerator == null ? recordCount :
sequenceGenerator.generate(row);
+ if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+ flushBuffer();
+ if (!buffer.put(sequenceNumber, rowKind, key, row)) {
+ // change row kind back
+ row.setRowKind(rowKind);
+ output.collect(record);
+ }
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // don't emit watermark immediately, emit them after flushing buffer
+ currentWatermark = mark.getTimestamp();
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ if (!endOfInput) {
+ flushBuffer();
+ }
+ // no records are expected to emit after endOfInput
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ endOfInput = true;
+ flushBuffer();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (buffer != null) {
+ buffer.clear();
+ }
+
+ super.close();
+ }
+
+ private void flushBuffer() throws Exception {
+ if (buffer.size() == 0) {
+ return;
+ }
+
+ buffer.forEach(
+ keyComparator,
+ mergeFunction,
+ null,
+ kv -> {
+ InternalRow row = kv.value();
+ row.setRowKind(kv.valueKind());
+ output.collect(new
StreamRecord<>(reusedRowData.replace(row)));
+ });
+ buffer.clear();
+
+ if (currentWatermark != Long.MIN_VALUE) {
+ super.processWatermark(new Watermark(currentWatermark));
+ // each watermark should only be emitted once
+ currentWatermark = Long.MIN_VALUE;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
index 924a71e7c..c2898856a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.flink.api.common.JobStatus;
@@ -49,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
@@ -60,10 +63,17 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
// Test Utilities
// ------------------------------------------------------------------------
private String path;
+ private Map<String, String> tableDefaultProperties;
@BeforeEach
public void before() throws IOException {
path = getTempDirPath();
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ tableDefaultProperties = new HashMap<>();
+ if (random.nextBoolean()) {
+
tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "256 kb");
+ }
}
private TableEnvironment createBatchTableEnvironment() {
@@ -88,6 +98,25 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
return env;
}
+ private String createCatalogSql(String catalogName, String warehouse) {
+ String defaultPropertyString = "";
+ if (tableDefaultProperties.size() > 0) {
+ defaultPropertyString = ", ";
+ defaultPropertyString +=
+ tableDefaultProperties.entrySet().stream()
+ .map(
+ e ->
+ String.format(
+ "'table-default.%s' =
'%s'",
+ e.getKey(), e.getValue()))
+ .collect(Collectors.joining(", "));
+ }
+
+ return String.format(
+ "CREATE CATALOG `%s` WITH ( 'type' = 'paimon', 'warehouse' =
'%s' %s )",
+ catalogName, warehouse, defaultPropertyString);
+ }
+
// ------------------------------------------------------------------------
// Constructed Tests
// ------------------------------------------------------------------------
@@ -109,10 +138,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
bEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- bEnv.executeSql(
- String.format(
- "CREATE CATALOG testCatalog WITH ('type'='paimon',
'warehouse'='%s')",
- path));
+ bEnv.executeSql(createCatalogSql("testCatalog", path));
bEnv.executeSql("USE CATALOG testCatalog");
bEnv.executeSql(
"CREATE TABLE T ("
@@ -130,10 +156,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
sEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- sEnv.executeSql(
- String.format(
- "CREATE CATALOG testCatalog WITH ('type'='paimon',
'warehouse'='%s')",
- path));
+ sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T").collect();
@@ -177,10 +200,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- sEnv.executeSql(
- String.format(
- "CREATE CATALOG testCatalog WITH ('type'='paimon',
'warehouse'='%s/warehouse')",
- path));
+ sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
sEnv.executeSql("USE CATALOG testCatalog");
sEnv.executeSql(
"CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT
ENFORCED ) "
@@ -190,6 +210,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
+ ")");
Path inputPath = new Path(path, "input");
+ LocalFileIO.create().mkdirs(inputPath);
sEnv.executeSql(
"CREATE TABLE `default_catalog`.`default_database`.`S` ( i
INT, g STRING ) "
+ "WITH ( 'connector' = 'filesystem', 'format' =
'testcsv', 'path' = '"
@@ -237,6 +258,8 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
"-U[4, D]",
"+U[4, C]",
"+I[5, D]");
+
+ it.close();
}
// ------------------------------------------------------------------------
@@ -435,10 +458,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
sEnv.getConfig()
.getConfiguration()
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- sEnv.executeSql(
- String.format(
- "CREATE CATALOG testCatalog WITH ('type'='paimon',
'warehouse'='%s')",
- path));
+ sEnv.executeSql(createCatalogSql("testCatalog", path));
sEnv.executeSql("USE CATALOG testCatalog");
ResultChecker checker = new ResultChecker();
@@ -476,10 +496,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
// no failure when creating catalog and table
FailingFileIO.reset(failingName, 0, 1);
- tEnv.executeSql(
- String.format(
- "CREATE CATALOG testCatalog WITH ('type'='paimon',
'warehouse'='%s')",
- failingPath));
+ tEnv.executeSql(createCatalogSql("testCatalog", failingPath));
tEnv.executeSql("USE CATALOG testCatalog");
tEnv.executeSql(
"CREATE TABLE T("
@@ -556,10 +573,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
private void checkBatchResult(int numProducers) throws Exception {
TableEnvironment bEnv = createBatchTableEnvironment();
- bEnv.executeSql(
- String.format(
- "CREATE CATALOG testCatalog WITH ('type'='paimon',
'warehouse'='%s')",
- path));
+ bEnv.executeSql(createCatalogSql("testCatalog", path));
bEnv.executeSql("USE CATALOG testCatalog");
ResultChecker checker = new ResultChecker();