This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c790d3f8 [flink] Remove 
cross-partition-upsert.bootstrap-min-partition (#2149)
1c790d3f8 is described below

commit 1c790d3f801ec2a70e373409576f5fa7c21ac40f
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 18 16:56:33 2023 +0800

    [flink] Remove cross-partition-upsert.bootstrap-min-partition (#2149)
---
 docs/content/concepts/primary-key-table.md         | 10 ++--
 .../shortcodes/generated/core_configuration.html   |  6 ---
 .../main/java/org/apache/paimon/CoreOptions.java   | 14 ------
 .../paimon/flink/sink/index/IndexBootstrap.java    | 47 ++++++++++++------
 .../flink/sink/index/IndexBootstrapTest.java       | 57 ++++++++++++++++------
 5 files changed, 75 insertions(+), 59 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 36ef1ba5d..c2a878f80 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -85,13 +85,9 @@ existing keys in the table when starting stream write job. 
Different merge engin
 Performance: For tables with a large amount of data, there will be a 
significant loss in performance. Moreover,
 initialization takes a long time.
 
-If your upsert does not rely on too old data, you can consider configuring 
index TTL and bootstrap-min-partition to
-reduce Index and initialization time:
-- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index, this can 
avoid maintaining too many indexes and lead
-  to worse and worse performance.
-- `'cross-partition-upsert.bootstrap-min-partition'`: The min partition 
bootstrap of rocksdb index, bootstrap will only
-  read the partitions above it, and the smaller partitions will not be read 
into the index. This can reduce job startup
-  time and excessive initialization of index.
+If your upsert does not rely on too old data, you can consider configuring 
index TTL to reduce Index and initialization time:
+- `'cross-partition-upsert.index-ttl'`: The TTL in rocksdb index and 
initialization, this can avoid maintaining too many
+  indexes and lead to worse and worse performance.
 
 But please note that this may also cause data duplication.
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0f777fbe3..3fb153a08 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -116,12 +116,6 @@ under the License.
             <td>Duration</td>
             <td>The discovery interval of continuous reading.</td>
         </tr>
-        <tr>
-            <td><h5>cross-partition-upsert.bootstrap-min-partition</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>The min partition bootstrap of rocksdb index for cross 
partition upsert (primary keys not contain all partition fields), bootstrap 
will only read the partitions above it, and the smaller partitions will not be 
read into the index. This can reduce job startup time and excessive 
initialization of index, but please note that this may also cause data 
duplication.</td>
-        </tr>
         <tr>
             <td><h5>cross-partition-upsert.index-ttl</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index b74aa1bad..942fa65b0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -880,16 +880,6 @@ public class CoreOptions implements Serializable {
                                     + "this can avoid maintaining too many 
indexes and lead to worse and worse performance, "
                                     + "but please note that this may also 
cause data duplication.");
 
-    public static final ConfigOption<String> 
CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION =
-            key("cross-partition-upsert.bootstrap-min-partition")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The min partition bootstrap of rocksdb index for 
cross partition upsert (primary keys not contain all partition fields), "
-                                    + "bootstrap will only read the partitions 
above it, and the smaller partitions will not be read into the index. "
-                                    + "This can reduce job startup time and 
excessive initialization of index, "
-                                    + "but please note that this may also 
cause data duplication.");
-
     public static final ConfigOption<Integer> ZORDER_VAR_LENGTH_CONTRIBUTION =
             key("zorder.var-length-contribution")
                     .intType()
@@ -1337,10 +1327,6 @@ public class CoreOptions implements Serializable {
         return options.get(CROSS_PARTITION_UPSERT_INDEX_TTL);
     }
 
-    public String crossPartitionUpsertBootstrapMinPartition() {
-        return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION);
-    }
-
     public int varTypeSize() {
         return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
index 66c0dc2dc..db1a3318d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -19,10 +19,11 @@
 package org.apache.paimon.flink.sink.index;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
-import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Table;
@@ -30,14 +31,13 @@ import 
org.apache.paimon.table.source.AbstractInnerTableScan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.TypeUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -80,21 +80,24 @@ public class IndexBootstrap implements Serializable {
                         .newReadBuilder()
                         .withProjection(projection);
 
-        String minPartition =
-                
CoreOptions.fromMap(table.options()).crossPartitionUpsertBootstrapMinPartition();
-        if (minPartition != null) {
-            int partIndex = fieldNames.indexOf(table.partitionKeys().get(0));
-            Object minPart = TypeUtils.castFromString(minPartition, 
rowType.getTypeAt(partIndex));
-            PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
-            readBuilder =
-                    
readBuilder.withFilter(predicateBuilder.greaterOrEqual(partIndex, minPart));
-        }
-
         AbstractInnerTableScan tableScan = (AbstractInnerTableScan) 
readBuilder.newScan();
-        TableScan.Plan plan =
-                tableScan.withBucketFilter(bucket -> bucket % numAssigners == 
assignId).plan();
+        List<Split> splits =
+                tableScan
+                        .withBucketFilter(bucket -> bucket % numAssigners == 
assignId)
+                        .plan()
+                        .splits();
+
+        Duration indexTtl = 
CoreOptions.fromMap(table.options()).crossPartitionUpsertIndexTtl();
+        if (indexTtl != null) {
+            long indexTtlMillis = indexTtl.toMillis();
+            long currentTime = System.currentTimeMillis();
+            splits =
+                    splits.stream()
+                            .filter(split -> filterSplit(split, 
indexTtlMillis, currentTime))
+                            .collect(Collectors.toList());
+        }
 
-        for (Split split : plan.splits()) {
+        for (Split split : splits) {
             try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(split)) {
                 int bucket = ((DataSplit) split).bucket();
                 GenericRow bucketRow = GenericRow.of(bucket);
@@ -105,6 +108,18 @@ public class IndexBootstrap implements Serializable {
         }
     }
 
+    @VisibleForTesting
+    static boolean filterSplit(Split split, long indexTtl, long currentTime) {
+        List<DataFileMeta> files = ((DataSplit) split).dataFiles();
+        for (DataFileMeta file : files) {
+            long fileTime = file.creationTimeEpochMillis();
+            if (currentTime <= fileTime + indexTtl) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public static RowType bootstrapType(TableSchema schema) {
         List<String> primaryKeys = schema.primaryKeys();
         List<String> partitionKeys = schema.partitionKeys();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
index c5c4bc35c..ca73dfebf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
@@ -22,21 +22,28 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 
-import static 
org.apache.paimon.CoreOptions.CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION;
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static org.apache.paimon.flink.sink.index.IndexBootstrap.filterSplit;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link IndexBootstrap}. */
@@ -90,24 +97,42 @@ public class IndexBootstrapTest extends TableTestBase {
                 .containsExactlyInAnyOrder(
                         GenericRow.of(2, 1, 3), GenericRow.of(4, 2, 5), 
GenericRow.of(6, 3, 7));
         result.clear();
+    }
 
-        // test bootstrap min partition
-        indexBootstrap =
-                new IndexBootstrap(
-                        table.copy(
-                                Collections.singletonMap(
-                                        
CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION.key(),
-                                        "2")));
+    @Test
+    public void testFilterSplit() {
+        assertThat(filterSplit(newSplit(newFile(100), newFile(200)), 50, 
230)).isTrue();
+        assertThat(filterSplit(newSplit(newFile(100), newFile(200)), 50, 
300)).isFalse();
+        assertThat(filterSplit(newSplit(newFile(100), newFile(200)), 200, 
230)).isTrue();
+    }
 
-        indexBootstrap.bootstrap(2, 0, consumer);
-        assertThat(result)
-                .containsExactlyInAnyOrder(GenericRow.of(7, 3, 8), 
GenericRow.of(5, 2, 6));
-        result.clear();
+    private DataSplit newSplit(DataFileMeta... files) {
+        return DataSplit.builder()
+                .withSnapshot(1)
+                .withPartition(EMPTY_ROW)
+                .withBucket(0)
+                .withDataFiles(Arrays.asList(files))
+                .build();
+    }
 
-        indexBootstrap.bootstrap(2, 1, consumer);
-        assertThat(result)
-                .containsExactlyInAnyOrder(GenericRow.of(4, 2, 5), 
GenericRow.of(6, 3, 7));
-        result.clear();
+    private static DataFileMeta newFile(long timeMillis) {
+        return new DataFileMeta(
+                "",
+                1,
+                1,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                DataFileMeta.EMPTY_KEY_STATS,
+                null,
+                0,
+                1,
+                0L,
+                DataFileMeta.DUMMY_LEVEL,
+                Collections.emptyList(),
+                Timestamp.fromLocalDateTime(
+                        Instant.ofEpochMilli(timeMillis)
+                                .atZone(ZoneId.systemDefault())
+                                .toLocalDateTime()));
     }
 
     private DynamicBucketRow row(int pt, int col, int pk, int bucket) {

Reply via email to