This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch cross_partition_upsert_ddl in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 41b79e663e0b80010911e5c1736e6fba8ec76a9a Author: Jingsong <[email protected]> AuthorDate: Fri Sep 8 11:32:17 2023 +0800 [core] Introduce index-ttl and bootstrap-min-partition for cross-partition-upsert --- docs/content/concepts/primary-key-table.md | 14 ++++- .../shortcodes/generated/core_configuration.html | 12 +++++ .../main/java/org/apache/paimon/CoreOptions.java | 27 ++++++++++ .../flink/lookup/FileStoreLookupFunction.java | 2 +- .../paimon/flink/lookup/RocksDBStateFactory.java | 13 ++++- .../flink/sink/index/GlobalIndexAssigner.java | 5 +- .../paimon/flink/sink/index/IndexBootstrap.java | 13 +++++ .../paimon/flink/lookup/LookupTableTest.java | 2 +- .../paimon/flink/lookup/RocksDBListStateTest.java | 3 +- .../flink/sink/index/IndexBootstrapTest.java | 61 ++++++++++++++++------ 10 files changed, 128 insertions(+), 24 deletions(-) diff --git a/docs/content/concepts/primary-key-table.md b/docs/content/concepts/primary-key-table.md index cb47455e9..e019ada53 100644 --- a/docs/content/concepts/primary-key-table.md +++ b/docs/content/concepts/primary-key-table.md @@ -64,13 +64,13 @@ Performance: entries in a partition takes up **1 GB** more memory, partitions that are no longer active do not take up memory. 2. For tables with low update rates, this mode is recommended to significantly improve performance. -#### Cross Partitions Update Dynamic Bucket Mode +#### Cross Partitions Upsert Dynamic Bucket Mode {{< hint info >}} This is an experimental feature. {{< /hint >}} -When you need cross partition updates (primary keys not contain all partition fields), Dynamic Bucket mode directly +When you need cross partition upsert (primary keys not contain all partition fields), Dynamic Bucket mode directly maintains the mapping of keys to partition and bucket, uses local disks, and initializes indexes by reading all existing keys in the table when starting stream write job. Different merge engines have different behaviors: @@ -81,6 +81,16 @@ existing keys in the table when starting stream write job. Different merge engin Performance: For tables with a large amount of data, there will be a significant loss in performance. Moreover, initialization takes a long time. +If your upsert does not rely on too old data, you can consider configuring index TTL and bootstrap-min-partition to +reduce Index and initialization time: +- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index, this can avoid maintaining too many indexes and lead + to worse and worse performance. +- `'cross-partition-upsert.bootstrap-min-partition'`: The min partition bootstrap of rocksdb index, bootstrap will only + read the partitions above it, and the smaller partitions will not be read into the index. This can reduce job startup + time and excessive initialization of index. + +But please note that this may also cause data duplication. + ## Merge Engines When Paimon sink receives two or more records with the same primary keys, it will merge them into one record to keep primary keys unique. By specifying the `merge-engine` table property, users can choose how records are merged together. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 84bf6cb64..c3b877982 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -116,6 +116,18 @@ under the License. <td>Duration</td> <td>The discovery interval of continuous reading.</td> </tr> + <tr> + <td><h5>cross-partition-upsert.bootstrap-min-partition</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The min partition bootstrap of rocksdb index for cross partition upsert (primary keys not contain all partition fields), bootstrap will only read the partitions above it, and the smaller partitions will not be read into the index. This can reduce job startup time and excessive initialization of index, but please note that this may also cause data duplication.</td> + </tr> + <tr> + <td><h5>cross-partition-upsert.index-ttl</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td> + </tr> <tr> <td><h5>dynamic-bucket.assigner-parallelism</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index eb954bd73..7ded97bba 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -849,6 +849,25 @@ public class CoreOptions implements Serializable { + "Mainly to resolve data skew on primary keys. " + "We recommend starting with 64 mb when trying out this feature."); + public static final ConfigOption<Duration> CROSS_PARTITION_UPSERT_INDEX_TTL = + key("cross-partition-upsert.index-ttl") + .durationType() + .noDefaultValue() + .withDescription( + "The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), " + + "this can avoid maintaining too many indexes and lead to worse and worse performance, " + + "but please note that this may also cause data duplication."); + + public static final ConfigOption<String> CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION = + key("cross-partition-upsert.bootstrap-min-partition") + .stringType() + .noDefaultValue() + .withDescription( + "The min partition bootstrap of rocksdb index for cross partition upsert (primary keys not contain all partition fields), " + + "bootstrap will only read the partitions above it, and the smaller partitions will not be read into the index. " + + "This can reduce job startup time and excessive initialization of index, " + + "but please note that this may also cause data duplication."); + private final Options options; public CoreOptions(Map<String, String> options) { @@ -1263,6 +1282,14 @@ public class CoreOptions implements Serializable { return options.get(LOCAL_MERGE_BUFFER_SIZE).getBytes(); } + public Duration crossPartitionUpsertIndexTtl() { + return options.get(CROSS_PARTITION_UPSERT_INDEX_TTL); + } + + public String crossPartitionUpsertBootstrapMinPartition() { + return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 55b2fa8d5..4715c3244 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -124,7 +124,7 @@ public class FileStoreLookupFunction implements Serializable, Closeable { private void open() throws Exception { Options options = Options.fromMap(table.options()); this.refreshInterval = options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL); - this.stateFactory = new RocksDBStateFactory(path.toString(), options); + this.stateFactory = new RocksDBStateFactory(path.toString(), options, null); List<String> fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java index af6a7f201..9c930f464 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java @@ -28,10 +28,14 @@ import org.rocksdb.DBOptions; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.TtlDB; + +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; /** Factory to create state. */ public class RocksDBStateFactory implements Closeable { @@ -42,7 +46,8 @@ public class RocksDBStateFactory implements Closeable { private final ColumnFamilyOptions columnFamilyOptions; - public RocksDBStateFactory(String path, org.apache.paimon.options.Options conf) + public RocksDBStateFactory( + String path, org.apache.paimon.options.Options conf, @Nullable Duration ttlSecs) throws IOException { DBOptions dbOptions = RocksDBOptions.createDBOptions( @@ -55,8 +60,12 @@ public class RocksDBStateFactory implements Closeable { RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), conf) .setMergeOperatorName(MERGE_OPERATOR_NAME); + Options options = new Options(dbOptions, columnFamilyOptions); try { - this.db = RocksDB.open(new Options(dbOptions, columnFamilyOptions), path); + this.db = + ttlSecs == null + ? RocksDB.open(options, path) + : TtlDB.open(options, path, (int) ttlSecs.getSeconds(), false); } catch (RocksDBException e) { throw new IOException("Error while opening RocksDB instance.", e); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java index 9662a3bb4..d35ef6ed4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java @@ -106,7 +106,10 @@ public class GlobalIndexAssigner<T> implements Serializable { // state Options options = coreOptions.toConfiguration(); this.path = new File(tmpDir, "lookup-" + UUID.randomUUID()); - this.stateFactory = new RocksDBStateFactory(path.toString(), options); + + this.stateFactory = + new RocksDBStateFactory( + path.toString(), options, coreOptions.crossPartitionUpsertIndexTtl()); RowType keyType = table.schema().logicalTrimmedPrimaryKeysType(); this.keyIndex = stateFactory.valueState( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java index 95b4c1a94..66c0dc2dc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java @@ -18,9 +18,11 @@ package org.apache.paimon.flink.sink.index; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.Table; @@ -32,6 +34,7 @@ import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.TypeUtils; import java.io.IOException; import java.io.Serializable; @@ -77,6 +80,16 @@ public class IndexBootstrap implements Serializable { .newReadBuilder() .withProjection(projection); + String minPartition = + CoreOptions.fromMap(table.options()).crossPartitionUpsertBootstrapMinPartition(); + if (minPartition != null) { + int partIndex = fieldNames.indexOf(table.partitionKeys().get(0)); + Object minPart = TypeUtils.castFromString(minPartition, rowType.getTypeAt(partIndex)); + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); + readBuilder = + readBuilder.withFilter(predicateBuilder.greaterOrEqual(partIndex, minPart)); + } + AbstractInnerTableScan tableScan = (AbstractInnerTableScan) readBuilder.newScan(); TableScan.Plan plan = tableScan.withBucketFilter(bucket -> bucket % numAssigners == assignId).plan(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 9c7366be6..dc1932b78 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -50,7 +50,7 @@ public class LookupTableTest { @BeforeEach public void before() throws IOException { - this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new Options()); + this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new Options(), null); this.rowType = RowType.of(new IntType(), new IntType(), new IntType()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java index da209d3d9..09c2ea106 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java @@ -47,7 +47,8 @@ public class RocksDBListStateTest { @Test void test() throws Exception { - RocksDBStateFactory factory = new RocksDBStateFactory(tempDir.toString(), new Options()); + RocksDBStateFactory factory = + new RocksDBStateFactory(tempDir.toString(), new Options(), null); RowType keyType = RowType.of(DataTypes.STRING()); RowType valueType = RowType.of(DataTypes.STRING()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java index aadec017e..c5c4bc35c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java @@ -26,29 +26,34 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.DynamicBucketRow; import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; +import static org.apache.paimon.CoreOptions.CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link IndexBootstrap}. */ public class IndexBootstrapTest extends TableTestBase { @Test - public void test() throws Exception { + public void testBoostrap() throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); - options.set(CoreOptions.BUCKET, 5); + options.set(CoreOptions.BUCKET, -1); Schema schema = Schema.newBuilder() + .column("pt", DataTypes.INT()) .column("col", DataTypes.INT()) .column("pk", DataTypes.INT()) .primaryKey("pk") + .partitionKeys("pt") .options(options.toMap()) .build(); catalog.createTable(identifier, schema, true); @@ -56,33 +61,57 @@ public class IndexBootstrapTest extends TableTestBase { write( table, - GenericRow.of(1, 1), - GenericRow.of(2, 2), - GenericRow.of(3, 3), - GenericRow.of(4, 4), - GenericRow.of(5, 5), - GenericRow.of(6, 6), - GenericRow.of(7, 7)); + row(1, 1, 1, 2), + row(1, 2, 2, 3), + row(1, 3, 3, 4), + row(2, 4, 4, 5), + row(2, 5, 5, 6), + row(3, 6, 6, 7), + row(3, 7, 7, 8)); IndexBootstrap indexBootstrap = new IndexBootstrap(table); List<GenericRow> result = new ArrayList<>(); Consumer<InternalRow> consumer = - row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1))); + row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1), row.getInt(2))); // output key and bucket indexBootstrap.bootstrap(2, 0, consumer); - assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2, 4), GenericRow.of(3, 0)); + assertThat(result) + .containsExactlyInAnyOrder( + GenericRow.of(7, 3, 8), + GenericRow.of(5, 2, 6), + GenericRow.of(1, 1, 2), + GenericRow.of(3, 1, 4)); result.clear(); indexBootstrap.bootstrap(2, 1, consumer); assertThat(result) .containsExactlyInAnyOrder( - GenericRow.of(1, 3), - GenericRow.of(4, 1), - GenericRow.of(5, 1), - GenericRow.of(6, 3), - GenericRow.of(7, 1)); + GenericRow.of(2, 1, 3), GenericRow.of(4, 2, 5), GenericRow.of(6, 3, 7)); + result.clear(); + + // test bootstrap min partition + indexBootstrap = + new IndexBootstrap( + table.copy( + Collections.singletonMap( + CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION.key(), + "2"))); + + indexBootstrap.bootstrap(2, 0, consumer); + assertThat(result) + .containsExactlyInAnyOrder(GenericRow.of(7, 3, 8), GenericRow.of(5, 2, 6)); result.clear(); + + indexBootstrap.bootstrap(2, 1, consumer); + assertThat(result) + .containsExactlyInAnyOrder(GenericRow.of(4, 2, 5), GenericRow.of(6, 3, 7)); + result.clear(); + } + + private DynamicBucketRow row(int pt, int col, int pk, int bucket) { + GenericRow row = GenericRow.of(pt, col, pk); + return new DynamicBucketRow(row, bucket); } }
