This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cd8ece259 [core] Introduce index-ttl and bootstrap-min-partition for
cross-partition-upsert (#1975)
cd8ece259 is described below
commit cd8ece259056ebea8550350563d03ec5287ddce4
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Sep 9 15:16:05 2023 +0800
[core] Introduce index-ttl and bootstrap-min-partition for
cross-partition-upsert (#1975)
---
docs/content/concepts/primary-key-table.md | 14 ++++-
.../shortcodes/generated/core_configuration.html | 12 +++++
.../main/java/org/apache/paimon/CoreOptions.java | 27 ++++++++++
.../flink/lookup/FileStoreLookupFunction.java | 2 +-
.../paimon/flink/lookup/RocksDBStateFactory.java | 13 ++++-
.../flink/sink/index/GlobalIndexAssigner.java | 5 +-
.../paimon/flink/sink/index/IndexBootstrap.java | 13 +++++
.../paimon/flink/lookup/LookupTableTest.java | 2 +-
.../paimon/flink/lookup/RocksDBListStateTest.java | 3 +-
.../flink/sink/index/GlobalIndexAssignerTest.java | 20 ++++++-
.../flink/sink/index/IndexBootstrapTest.java | 61 ++++++++++++++++------
11 files changed, 147 insertions(+), 25 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index cb47455e9..e019ada53 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -64,13 +64,13 @@ Performance:
entries in a partition takes up **1 GB** more memory, partitions that are
no longer active do not take up memory.
2. For tables with low update rates, this mode is recommended to significantly
improve performance.
-#### Cross Partitions Update Dynamic Bucket Mode
+#### Cross Partitions Upsert Dynamic Bucket Mode
{{< hint info >}}
This is an experimental feature.
{{< /hint >}}
-When you need cross partition updates (primary keys not contain all partition
fields), Dynamic Bucket mode directly
+When you need cross partition upsert (primary keys not contain all partition
fields), Dynamic Bucket mode directly
maintains the mapping of keys to partition and bucket, uses local disks, and
initializes indexes by reading all
existing keys in the table when starting stream write job. Different merge
engines have different behaviors:
@@ -81,6 +81,16 @@ existing keys in the table when starting stream write job.
Different merge engin
Performance: For tables with a large amount of data, there will be a
significant loss in performance. Moreover,
initialization takes a long time.
+If your upsert does not rely on too old data, you can consider configuring
index TTL and bootstrap-min-partition to
+reduce Index and initialization time:
+- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index, this can
avoid maintaining too many indexes and lead
+ to worse and worse performance.
+- `'cross-partition-upsert.bootstrap-min-partition'`: The min partition
bootstrap of rocksdb index, bootstrap will only
+ read the partitions above it, and the smaller partitions will not be read
into the index. This can reduce job startup
+ time and excessive initialization of index.
+
+But please note that this may also cause data duplication.
+
## Merge Engines
When Paimon sink receives two or more records with the same primary keys, it
will merge them into one record to keep primary keys unique. By specifying the
`merge-engine` table property, users can choose how records are merged together.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 84bf6cb64..c3b877982 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -116,6 +116,18 @@ under the License.
<td>Duration</td>
<td>The discovery interval of continuous reading.</td>
</tr>
+ <tr>
+ <td><h5>cross-partition-upsert.bootstrap-min-partition</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The min partition bootstrap of rocksdb index for cross
partition upsert (primary keys not contain all partition fields), bootstrap
will only read the partitions above it, and the smaller partitions will not be
read into the index. This can reduce job startup time and excessive
initialization of index, but please note that this may also cause data
duplication.</td>
+ </tr>
+ <tr>
+ <td><h5>cross-partition-upsert.index-ttl</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The TTL in rocksdb index for cross partition upsert (primary
keys not contain all partition fields), this can avoid maintaining too many
indexes and lead to worse and worse performance, but please note that this may
also cause data duplication.</td>
+ </tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index eb954bd73..7ded97bba 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -849,6 +849,25 @@ public class CoreOptions implements Serializable {
+ "Mainly to resolve data skew on primary
keys. "
+ "We recommend starting with 64 mb when
trying out this feature.");
+ public static final ConfigOption<Duration>
CROSS_PARTITION_UPSERT_INDEX_TTL =
+ key("cross-partition-upsert.index-ttl")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The TTL in rocksdb index for cross partition
upsert (primary keys not contain all partition fields), "
+ + "this can avoid maintaining too many
indexes and lead to worse and worse performance, "
+ + "but please note that this may also
cause data duplication.");
+
+ public static final ConfigOption<String>
CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION =
+ key("cross-partition-upsert.bootstrap-min-partition")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The min partition bootstrap of rocksdb index for
cross partition upsert (primary keys not contain all partition fields), "
+ + "bootstrap will only read the partitions
above it, and the smaller partitions will not be read into the index. "
+ + "This can reduce job startup time and
excessive initialization of index, "
+ + "but please note that this may also
cause data duplication.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1263,6 +1282,14 @@ public class CoreOptions implements Serializable {
return options.get(LOCAL_MERGE_BUFFER_SIZE).getBytes();
}
+ public Duration crossPartitionUpsertIndexTtl() {
+ return options.get(CROSS_PARTITION_UPSERT_INDEX_TTL);
+ }
+
+ public String crossPartitionUpsertBootstrapMinPartition() {
+ return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 55b2fa8d5..4715c3244 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -124,7 +124,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
private void open() throws Exception {
Options options = Options.fromMap(table.options());
this.refreshInterval =
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
- this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+ this.stateFactory = new RocksDBStateFactory(path.toString(), options,
null);
List<String> fieldNames = table.rowType().getFieldNames();
int[] projection =
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index af6a7f201..9c930f464 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -28,10 +28,14 @@ import org.rocksdb.DBOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.TtlDB;
+
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
/** Factory to create state. */
public class RocksDBStateFactory implements Closeable {
@@ -42,7 +46,8 @@ public class RocksDBStateFactory implements Closeable {
private final ColumnFamilyOptions columnFamilyOptions;
- public RocksDBStateFactory(String path, org.apache.paimon.options.Options
conf)
+ public RocksDBStateFactory(
+ String path, org.apache.paimon.options.Options conf, @Nullable
Duration ttlSecs)
throws IOException {
DBOptions dbOptions =
RocksDBOptions.createDBOptions(
@@ -55,8 +60,12 @@ public class RocksDBStateFactory implements Closeable {
RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(),
conf)
.setMergeOperatorName(MERGE_OPERATOR_NAME);
+ Options options = new Options(dbOptions, columnFamilyOptions);
try {
- this.db = RocksDB.open(new Options(dbOptions,
columnFamilyOptions), path);
+ this.db =
+ ttlSecs == null
+ ? RocksDB.open(options, path)
+ : TtlDB.open(options, path, (int)
ttlSecs.getSeconds(), false);
} catch (RocksDBException e) {
throw new IOException("Error while opening RocksDB instance.", e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
index 9662a3bb4..d35ef6ed4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -106,7 +106,10 @@ public class GlobalIndexAssigner<T> implements
Serializable {
// state
Options options = coreOptions.toConfiguration();
this.path = new File(tmpDir, "lookup-" + UUID.randomUUID());
- this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+
+ this.stateFactory =
+ new RocksDBStateFactory(
+ path.toString(), options,
coreOptions.crossPartitionUpsertIndexTtl());
RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
this.keyIndex =
stateFactory.valueState(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
index 95b4c1a94..66c0dc2dc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink.sink.index;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Table;
@@ -32,6 +34,7 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TypeUtils;
import java.io.IOException;
import java.io.Serializable;
@@ -77,6 +80,16 @@ public class IndexBootstrap implements Serializable {
.newReadBuilder()
.withProjection(projection);
+ String minPartition =
+
CoreOptions.fromMap(table.options()).crossPartitionUpsertBootstrapMinPartition();
+ if (minPartition != null) {
+ int partIndex = fieldNames.indexOf(table.partitionKeys().get(0));
+ Object minPart = TypeUtils.castFromString(minPartition,
rowType.getTypeAt(partIndex));
+ PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
+ readBuilder =
+
readBuilder.withFilter(predicateBuilder.greaterOrEqual(partIndex, minPart));
+ }
+
AbstractInnerTableScan tableScan = (AbstractInnerTableScan)
readBuilder.newScan();
TableScan.Plan plan =
tableScan.withBucketFilter(bucket -> bucket % numAssigners ==
assignId).plan();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 9c7366be6..dc1932b78 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -50,7 +50,7 @@ public class LookupTableTest {
@BeforeEach
public void before() throws IOException {
- this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new
Options());
+ this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new
Options(), null);
this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
index da209d3d9..09c2ea106 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
@@ -47,7 +47,8 @@ public class RocksDBListStateTest {
@Test
void test() throws Exception {
- RocksDBStateFactory factory = new
RocksDBStateFactory(tempDir.toString(), new Options());
+ RocksDBStateFactory factory =
+ new RocksDBStateFactory(tempDir.toString(), new Options(),
null);
RowType keyType = RowType.of(DataTypes.STRING());
RowType valueType = RowType.of(DataTypes.STRING());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
index 0c508a0cb..67d54f61a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -44,6 +45,11 @@ import static org.assertj.core.api.Assertions.assertThat;
public class GlobalIndexAssignerTest extends TableTestBase {
private GlobalIndexAssigner<RowData> createAssigner(MergeEngine
mergeEngine) throws Exception {
+ return createAssigner(mergeEngine, false);
+ }
+
+ private GlobalIndexAssigner<RowData> createAssigner(MergeEngine
mergeEngine, boolean enableTtl)
+ throws Exception {
Identifier identifier = identifier("T");
Options options = new Options();
options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
@@ -52,6 +58,9 @@ public class GlobalIndexAssignerTest extends TableTestBase {
}
options.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 3L);
options.set(CoreOptions.BUCKET, -1);
+ if (enableTtl) {
+ options.set(CoreOptions.CROSS_PARTITION_UPSERT_INDEX_TTL,
Duration.ofSeconds(1000));
+ }
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
@@ -67,7 +76,16 @@ public class GlobalIndexAssignerTest extends TableTestBase {
@Test
public void testBucketAssign() throws Exception {
- GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE);
+ innerTestBucketAssign(false);
+ }
+
+ @Test
+ public void testEnableTtl() throws Exception {
+ innerTestBucketAssign(true);
+ }
+
+ private void innerTestBucketAssign(boolean enableTtl) throws Exception {
+ GlobalIndexAssigner<RowData> assigner =
createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
List<Integer> output = new ArrayList<>();
assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) ->
output.add(bucket));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
index aadec017e..c5c4bc35c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
@@ -26,29 +26,34 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
+import static
org.apache.paimon.CoreOptions.CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link IndexBootstrap}. */
public class IndexBootstrapTest extends TableTestBase {
@Test
- public void test() throws Exception {
+ public void testBoostrap() throws Exception {
Identifier identifier = identifier("T");
Options options = new Options();
- options.set(CoreOptions.BUCKET, 5);
+ options.set(CoreOptions.BUCKET, -1);
Schema schema =
Schema.newBuilder()
+ .column("pt", DataTypes.INT())
.column("col", DataTypes.INT())
.column("pk", DataTypes.INT())
.primaryKey("pk")
+ .partitionKeys("pt")
.options(options.toMap())
.build();
catalog.createTable(identifier, schema, true);
@@ -56,33 +61,57 @@ public class IndexBootstrapTest extends TableTestBase {
write(
table,
- GenericRow.of(1, 1),
- GenericRow.of(2, 2),
- GenericRow.of(3, 3),
- GenericRow.of(4, 4),
- GenericRow.of(5, 5),
- GenericRow.of(6, 6),
- GenericRow.of(7, 7));
+ row(1, 1, 1, 2),
+ row(1, 2, 2, 3),
+ row(1, 3, 3, 4),
+ row(2, 4, 4, 5),
+ row(2, 5, 5, 6),
+ row(3, 6, 6, 7),
+ row(3, 7, 7, 8));
IndexBootstrap indexBootstrap = new IndexBootstrap(table);
List<GenericRow> result = new ArrayList<>();
Consumer<InternalRow> consumer =
- row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1)));
+ row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1),
row.getInt(2)));
// output key and bucket
indexBootstrap.bootstrap(2, 0, consumer);
- assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2, 4),
GenericRow.of(3, 0));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(7, 3, 8),
+ GenericRow.of(5, 2, 6),
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(3, 1, 4));
result.clear();
indexBootstrap.bootstrap(2, 1, consumer);
assertThat(result)
.containsExactlyInAnyOrder(
- GenericRow.of(1, 3),
- GenericRow.of(4, 1),
- GenericRow.of(5, 1),
- GenericRow.of(6, 3),
- GenericRow.of(7, 1));
+ GenericRow.of(2, 1, 3), GenericRow.of(4, 2, 5),
GenericRow.of(6, 3, 7));
+ result.clear();
+
+ // test bootstrap min partition
+ indexBootstrap =
+ new IndexBootstrap(
+ table.copy(
+ Collections.singletonMap(
+
CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION.key(),
+ "2")));
+
+ indexBootstrap.bootstrap(2, 0, consumer);
+ assertThat(result)
+ .containsExactlyInAnyOrder(GenericRow.of(7, 3, 8),
GenericRow.of(5, 2, 6));
result.clear();
+
+ indexBootstrap.bootstrap(2, 1, consumer);
+ assertThat(result)
+ .containsExactlyInAnyOrder(GenericRow.of(4, 2, 5),
GenericRow.of(6, 3, 7));
+ result.clear();
+ }
+
+ private DynamicBucketRow row(int pt, int col, int pk, int bucket) {
+ GenericRow row = GenericRow.of(pt, col, pk);
+ return new DynamicBucketRow(row, bucket);
}
}