This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cd8ece259 [core] Introduce index-ttl and bootstrap-min-partition for 
cross-partition-upsert (#1975)
cd8ece259 is described below

commit cd8ece259056ebea8550350563d03ec5287ddce4
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Sep 9 15:16:05 2023 +0800

    [core] Introduce index-ttl and bootstrap-min-partition for 
cross-partition-upsert (#1975)
---
 docs/content/concepts/primary-key-table.md         | 14 ++++-
 .../shortcodes/generated/core_configuration.html   | 12 +++++
 .../main/java/org/apache/paimon/CoreOptions.java   | 27 ++++++++++
 .../flink/lookup/FileStoreLookupFunction.java      |  2 +-
 .../paimon/flink/lookup/RocksDBStateFactory.java   | 13 ++++-
 .../flink/sink/index/GlobalIndexAssigner.java      |  5 +-
 .../paimon/flink/sink/index/IndexBootstrap.java    | 13 +++++
 .../paimon/flink/lookup/LookupTableTest.java       |  2 +-
 .../paimon/flink/lookup/RocksDBListStateTest.java  |  3 +-
 .../flink/sink/index/GlobalIndexAssignerTest.java  | 20 ++++++-
 .../flink/sink/index/IndexBootstrapTest.java       | 61 ++++++++++++++++------
 11 files changed, 147 insertions(+), 25 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index cb47455e9..e019ada53 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -64,13 +64,13 @@ Performance:
    entries in a partition takes up **1 GB** more memory, partitions that are 
no longer active do not take up memory.
 2. For tables with low update rates, this mode is recommended to significantly 
improve performance.
 
-#### Cross Partitions Update Dynamic Bucket Mode
+#### Cross Partitions Upsert Dynamic Bucket Mode
 
 {{< hint info >}}
 This is an experimental feature.
 {{< /hint >}}
 
-When you need cross partition updates (primary keys not contain all partition 
fields), Dynamic Bucket mode directly
+When you need cross partition upsert (primary keys not contain all partition 
fields), Dynamic Bucket mode directly
 maintains the mapping of keys to partition and bucket, uses local disks, and 
initializes indexes by reading all 
 existing keys in the table when starting stream write job. Different merge 
engines have different behaviors:
 
@@ -81,6 +81,16 @@ existing keys in the table when starting stream write job. 
Different merge engin
 Performance: For tables with a large amount of data, there will be a 
significant loss in performance. Moreover,
 initialization takes a long time.
 
+If your upsert does not rely on too old data, you can consider configuring 
index TTL and bootstrap-min-partition to
+reduce Index and initialization time:
+- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index, this can 
avoid maintaining too many indexes and lead
+  to worse and worse performance.
+- `'cross-partition-upsert.bootstrap-min-partition'`: The min partition 
bootstrap of rocksdb index, bootstrap will only
+  read the partitions above it, and the smaller partitions will not be read 
into the index. This can reduce job startup
+  time and excessive initialization of index.
+
+But please note that this may also cause data duplication.
+
 ## Merge Engines
 
 When Paimon sink receives two or more records with the same primary keys, it 
will merge them into one record to keep primary keys unique. By specifying the 
`merge-engine` table property, users can choose how records are merged together.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 84bf6cb64..c3b877982 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -116,6 +116,18 @@ under the License.
             <td>Duration</td>
             <td>The discovery interval of continuous reading.</td>
         </tr>
+        <tr>
+            <td><h5>cross-partition-upsert.bootstrap-min-partition</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The min partition bootstrap of rocksdb index for cross 
partition upsert (primary keys not contain all partition fields), bootstrap 
will only read the partitions above it, and the smaller partitions will not be 
read into the index. This can reduce job startup time and excessive 
initialization of index, but please note that this may also cause data 
duplication.</td>
+        </tr>
+        <tr>
+            <td><h5>cross-partition-upsert.index-ttl</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>The TTL in rocksdb index for cross partition upsert (primary 
keys not contain all partition fields), this can avoid maintaining too many 
indexes and lead to worse and worse performance, but please note that this may 
also cause data duplication.</td>
+        </tr>
         <tr>
             <td><h5>dynamic-bucket.assigner-parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index eb954bd73..7ded97bba 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -849,6 +849,25 @@ public class CoreOptions implements Serializable {
                                     + "Mainly to resolve data skew on primary 
keys. "
                                     + "We recommend starting with 64 mb when 
trying out this feature.");
 
+    public static final ConfigOption<Duration> 
CROSS_PARTITION_UPSERT_INDEX_TTL =
+            key("cross-partition-upsert.index-ttl")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The TTL in rocksdb index for cross partition 
upsert (primary keys not contain all partition fields), "
+                                    + "this can avoid maintaining too many 
indexes and lead to worse and worse performance, "
+                                    + "but please note that this may also 
cause data duplication.");
+
+    public static final ConfigOption<String> 
CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION =
+            key("cross-partition-upsert.bootstrap-min-partition")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The min partition bootstrap of rocksdb index for 
cross partition upsert (primary keys not contain all partition fields), "
+                                    + "bootstrap will only read the partitions 
above it, and the smaller partitions will not be read into the index. "
+                                    + "This can reduce job startup time and 
excessive initialization of index, "
+                                    + "but please note that this may also 
cause data duplication.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1263,6 +1282,14 @@ public class CoreOptions implements Serializable {
         return options.get(LOCAL_MERGE_BUFFER_SIZE).getBytes();
     }
 
+    public Duration crossPartitionUpsertIndexTtl() {
+        return options.get(CROSS_PARTITION_UPSERT_INDEX_TTL);
+    }
+
+    public String crossPartitionUpsertBootstrapMinPartition() {
+        return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 55b2fa8d5..4715c3244 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -124,7 +124,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     private void open() throws Exception {
         Options options = Options.fromMap(table.options());
         this.refreshInterval = 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
-        this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+        this.stateFactory = new RocksDBStateFactory(path.toString(), options, 
null);
 
         List<String> fieldNames = table.rowType().getFieldNames();
         int[] projection = 
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index af6a7f201..9c930f464 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -28,10 +28,14 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.TtlDB;
+
+import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 
 /** Factory to create state. */
 public class RocksDBStateFactory implements Closeable {
@@ -42,7 +46,8 @@ public class RocksDBStateFactory implements Closeable {
 
     private final ColumnFamilyOptions columnFamilyOptions;
 
-    public RocksDBStateFactory(String path, org.apache.paimon.options.Options 
conf)
+    public RocksDBStateFactory(
+            String path, org.apache.paimon.options.Options conf, @Nullable 
Duration ttlSecs)
             throws IOException {
         DBOptions dbOptions =
                 RocksDBOptions.createDBOptions(
@@ -55,8 +60,12 @@ public class RocksDBStateFactory implements Closeable {
                 RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), 
conf)
                         .setMergeOperatorName(MERGE_OPERATOR_NAME);
 
+        Options options = new Options(dbOptions, columnFamilyOptions);
         try {
-            this.db = RocksDB.open(new Options(dbOptions, 
columnFamilyOptions), path);
+            this.db =
+                    ttlSecs == null
+                            ? RocksDB.open(options, path)
+                            : TtlDB.open(options, path, (int) 
ttlSecs.getSeconds(), false);
         } catch (RocksDBException e) {
             throw new IOException("Error while opening RocksDB instance.", e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
index 9662a3bb4..d35ef6ed4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -106,7 +106,10 @@ public class GlobalIndexAssigner<T> implements 
Serializable {
         // state
         Options options = coreOptions.toConfiguration();
         this.path = new File(tmpDir, "lookup-" + UUID.randomUUID());
-        this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+
+        this.stateFactory =
+                new RocksDBStateFactory(
+                        path.toString(), options, 
coreOptions.crossPartitionUpsertIndexTtl());
         RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
         this.keyIndex =
                 stateFactory.valueState(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
index 95b4c1a94..66c0dc2dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.flink.sink.index;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Table;
@@ -32,6 +34,7 @@ import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TypeUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -77,6 +80,16 @@ public class IndexBootstrap implements Serializable {
                         .newReadBuilder()
                         .withProjection(projection);
 
+        String minPartition =
+                
CoreOptions.fromMap(table.options()).crossPartitionUpsertBootstrapMinPartition();
+        if (minPartition != null) {
+            int partIndex = fieldNames.indexOf(table.partitionKeys().get(0));
+            Object minPart = TypeUtils.castFromString(minPartition, 
rowType.getTypeAt(partIndex));
+            PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
+            readBuilder =
+                    
readBuilder.withFilter(predicateBuilder.greaterOrEqual(partIndex, minPart));
+        }
+
         AbstractInnerTableScan tableScan = (AbstractInnerTableScan) 
readBuilder.newScan();
         TableScan.Plan plan =
                 tableScan.withBucketFilter(bucket -> bucket % numAssigners == 
assignId).plan();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 9c7366be6..dc1932b78 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -50,7 +50,7 @@ public class LookupTableTest {
 
     @BeforeEach
     public void before() throws IOException {
-        this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new 
Options());
+        this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new 
Options(), null);
         this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
index da209d3d9..09c2ea106 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
@@ -47,7 +47,8 @@ public class RocksDBListStateTest {
 
     @Test
     void test() throws Exception {
-        RocksDBStateFactory factory = new 
RocksDBStateFactory(tempDir.toString(), new Options());
+        RocksDBStateFactory factory =
+                new RocksDBStateFactory(tempDir.toString(), new Options(), 
null);
 
         RowType keyType = RowType.of(DataTypes.STRING());
         RowType valueType = RowType.of(DataTypes.STRING());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
index 0c508a0cb..67d54f61a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -44,6 +45,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class GlobalIndexAssignerTest extends TableTestBase {
 
     private GlobalIndexAssigner<RowData> createAssigner(MergeEngine 
mergeEngine) throws Exception {
+        return createAssigner(mergeEngine, false);
+    }
+
+    private GlobalIndexAssigner<RowData> createAssigner(MergeEngine 
mergeEngine, boolean enableTtl)
+            throws Exception {
         Identifier identifier = identifier("T");
         Options options = new Options();
         options.set(CoreOptions.MERGE_ENGINE, mergeEngine);
@@ -52,6 +58,9 @@ public class GlobalIndexAssignerTest extends TableTestBase {
         }
         options.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 3L);
         options.set(CoreOptions.BUCKET, -1);
+        if (enableTtl) {
+            options.set(CoreOptions.CROSS_PARTITION_UPSERT_INDEX_TTL, 
Duration.ofSeconds(1000));
+        }
         Schema schema =
                 Schema.newBuilder()
                         .column("pt", DataTypes.INT())
@@ -67,7 +76,16 @@ public class GlobalIndexAssignerTest extends TableTestBase {
 
     @Test
     public void testBucketAssign() throws Exception {
-        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.DEDUPLICATE);
+        innerTestBucketAssign(false);
+    }
+
+    @Test
+    public void testEnableTtl() throws Exception {
+        innerTestBucketAssign(true);
+    }
+
+    private void innerTestBucketAssign(boolean enableTtl) throws Exception {
+        GlobalIndexAssigner<RowData> assigner = 
createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
         List<Integer> output = new ArrayList<>();
         assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) -> 
output.add(bucket));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
index aadec017e..c5c4bc35c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
@@ -26,29 +26,34 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.DynamicBucketRow;
 import org.apache.paimon.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static 
org.apache.paimon.CoreOptions.CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link IndexBootstrap}. */
 public class IndexBootstrapTest extends TableTestBase {
 
     @Test
-    public void test() throws Exception {
+    public void testBoostrap() throws Exception {
         Identifier identifier = identifier("T");
         Options options = new Options();
-        options.set(CoreOptions.BUCKET, 5);
+        options.set(CoreOptions.BUCKET, -1);
         Schema schema =
                 Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
                         .column("col", DataTypes.INT())
                         .column("pk", DataTypes.INT())
                         .primaryKey("pk")
+                        .partitionKeys("pt")
                         .options(options.toMap())
                         .build();
         catalog.createTable(identifier, schema, true);
@@ -56,33 +61,57 @@ public class IndexBootstrapTest extends TableTestBase {
 
         write(
                 table,
-                GenericRow.of(1, 1),
-                GenericRow.of(2, 2),
-                GenericRow.of(3, 3),
-                GenericRow.of(4, 4),
-                GenericRow.of(5, 5),
-                GenericRow.of(6, 6),
-                GenericRow.of(7, 7));
+                row(1, 1, 1, 2),
+                row(1, 2, 2, 3),
+                row(1, 3, 3, 4),
+                row(2, 4, 4, 5),
+                row(2, 5, 5, 6),
+                row(3, 6, 6, 7),
+                row(3, 7, 7, 8));
 
         IndexBootstrap indexBootstrap = new IndexBootstrap(table);
         List<GenericRow> result = new ArrayList<>();
         Consumer<InternalRow> consumer =
-                row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1)));
+                row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1), 
row.getInt(2)));
 
         // output key and bucket
 
         indexBootstrap.bootstrap(2, 0, consumer);
-        assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2, 4), 
GenericRow.of(3, 0));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(7, 3, 8),
+                        GenericRow.of(5, 2, 6),
+                        GenericRow.of(1, 1, 2),
+                        GenericRow.of(3, 1, 4));
         result.clear();
 
         indexBootstrap.bootstrap(2, 1, consumer);
         assertThat(result)
                 .containsExactlyInAnyOrder(
-                        GenericRow.of(1, 3),
-                        GenericRow.of(4, 1),
-                        GenericRow.of(5, 1),
-                        GenericRow.of(6, 3),
-                        GenericRow.of(7, 1));
+                        GenericRow.of(2, 1, 3), GenericRow.of(4, 2, 5), 
GenericRow.of(6, 3, 7));
+        result.clear();
+
+        // test bootstrap min partition
+        indexBootstrap =
+                new IndexBootstrap(
+                        table.copy(
+                                Collections.singletonMap(
+                                        
CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION.key(),
+                                        "2")));
+
+        indexBootstrap.bootstrap(2, 0, consumer);
+        assertThat(result)
+                .containsExactlyInAnyOrder(GenericRow.of(7, 3, 8), 
GenericRow.of(5, 2, 6));
         result.clear();
+
+        indexBootstrap.bootstrap(2, 1, consumer);
+        assertThat(result)
+                .containsExactlyInAnyOrder(GenericRow.of(4, 2, 5), 
GenericRow.of(6, 3, 7));
+        result.clear();
+    }
+
+    private DynamicBucketRow row(int pt, int col, int pk, int bucket) {
+        GenericRow row = GenericRow.of(pt, col, pk);
+        return new DynamicBucketRow(row, bucket);
     }
 }

Reply via email to