This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ec3711c9 [core] support check partition expire when batch mode or 
bounded stream (#3933)
9ec3711c9 is described below

commit 9ec3711c91dec792fc9afb4c4eefaf964bac9572
Author: wangwj <[email protected]>
AuthorDate: Mon Aug 12 14:30:21 2024 +0800

    [core] support check partition expire when batch mode or bounded stream 
(#3933)
---
 .../shortcodes/generated/core_configuration.html   | 18 ++++--
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 ++++
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 +-
 .../apache/paimon/operation/PartitionExpire.java   | 21 +++++--
 .../paimon/table/AbstractFileStoreTable.java       |  4 +-
 ...ava => FlinkEndInputPartitionExpireITCase.java} | 67 +++++++---------------
 .../paimon/flink/FlinkEndInputWatermarkITCase.java |  8 +--
 7 files changed, 67 insertions(+), 65 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index b0497b345..ee36a01de 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -242,6 +242,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether only overwrite dynamic partition when overwriting a 
partitioned table with dynamic partition columns. Works only when the table has 
partition keys.</td>
         </tr>
+        <tr>
+            <td><h5>end-input.check-partition-expire</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Optional endInput check partition expire used in case of batch 
mode or bounded stream.</td>
+        </tr>
         <tr>
             <td><h5>fields.default-aggregate-function</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -302,6 +308,12 @@ under the License.
             <td>Map</td>
             <td>Define different file format for different level, you can add 
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file 
format for level is not provided, the default format which set by `file.format` 
will be used.</td>
         </tr>
+        <tr>
+            <td><h5>force-lookup</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to force the use of lookup for compaction.</td>
+        </tr>
         <tr>
             <td><h5>full-compaction.delta-commits</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -345,12 +357,6 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Integer</td>
             <td>The maximal fan-in for external merge sort. It limits the 
number of file handles. If it is too small, may cause intermediate merging. But 
if it is too large, it will cause too many files opened at the same time, 
consume memory and lead to random reading.</td>
         </tr>
-        <tr>
-            <td><h5>force-lookup</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to force the use of lookup for compaction.</td>
-        </tr>
         <tr>
             <td><h5>lookup-wait</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c48527d2a..961062209 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -964,6 +964,13 @@ public class CoreOptions implements Serializable {
                             "Read incremental changes between start timestamp 
(exclusive) and end timestamp, "
                                     + "for example, 't1,t2' means changes 
between timestamp t1 and timestamp t2.");
 
+    public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE 
=
+            key("end-input.check-partition-expire")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional endInput check partition expire used in 
case of batch mode or bounded stream.");
+
     public static final String STATS_MODE_SUFFIX = "stats-mode";
 
     public static final ConfigOption<String> METADATA_STATS_MODE =
@@ -1542,6 +1549,10 @@ public class CoreOptions implements Serializable {
                 .orElseGet(() -> Runtime.getRuntime().availableProcessors());
     }
 
+    public boolean endInputCheckPartitionExpire() {
+        return options.get(END_INPUT_CHECK_PARTITION_EXPIRE);
+    }
+
     public ExpireConfig expireConfig() {
         return ExpireConfig.builder()
                 .snapshotRetainMax(snapshotNumRetainMax())
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5120db295..21fb87562 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -288,7 +288,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 PartitionExpireStrategy.createPartitionExpireStrategy(options, 
partitionType()),
                 newScan(),
                 newCommit(commitUser),
-                metastoreClient);
+                metastoreClient,
+                options.endInputCheckPartitionExpire());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index b769fc899..2f5ca780c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -45,10 +45,9 @@ public class PartitionExpire {
     private final FileStoreScan scan;
     private final FileStoreCommit commit;
     private final MetastoreClient metastoreClient;
-
     private LocalDateTime lastCheck;
-
     private final PartitionExpireStrategy strategy;
+    private final boolean endInputCheckPartitionExpire;
 
     public PartitionExpire(
             Duration expirationTime,
@@ -56,7 +55,8 @@ public class PartitionExpire {
             PartitionExpireStrategy strategy,
             FileStoreScan scan,
             FileStoreCommit commit,
-            @Nullable MetastoreClient metastoreClient) {
+            @Nullable MetastoreClient metastoreClient,
+            boolean endInputCheckPartitionExpire) {
         this.expirationTime = expirationTime;
         this.checkInterval = checkInterval;
         this.strategy = strategy;
@@ -64,6 +64,17 @@ public class PartitionExpire {
         this.commit = commit;
         this.metastoreClient = metastoreClient;
         this.lastCheck = LocalDateTime.now();
+        this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
+    }
+
+    public PartitionExpire(
+            Duration expirationTime,
+            Duration checkInterval,
+            PartitionExpireStrategy strategy,
+            FileStoreScan scan,
+            FileStoreCommit commit,
+            @Nullable MetastoreClient metastoreClient) {
+        this(expirationTime, checkInterval, strategy, scan, commit, 
metastoreClient, false);
     }
 
     public PartitionExpire withLock(Lock lock) {
@@ -82,7 +93,9 @@ public class PartitionExpire {
 
     @VisibleForTesting
     List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) 
{
-        if (checkInterval.isZero() || 
now.isAfter(lastCheck.plus(checkInterval))) {
+        if (checkInterval.isZero()
+                || now.isAfter(lastCheck.plus(checkInterval))
+                || (endInputCheckPartitionExpire && Long.MAX_VALUE == 
commitIdentifier)) {
             List<Map<String, String>> expired =
                     doExpire(now.minus(expirationTime), commitIdentifier);
             lastCheck = now;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 04f5f6dd6..a6228ab27 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -381,9 +381,9 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 catalogEnvironment.lockFactory().create(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
                 new ConsumerManager(fileIO, path, snapshotManager().branch()),
-                coreOptions().snapshotExpireExecutionMode(),
+                options.snapshotExpireExecutionMode(),
                 name(),
-                coreOptions().forceCreatingSnapshot());
+                options.forceCreatingSnapshot());
     }
 
     private List<CommitCallback> createCommitCallbacks(String commitUser) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java
similarity index 77%
copy from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
copy to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java
index 19fd139d8..63eebcd7d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java
@@ -19,10 +19,8 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.sink.FixedBucketSink;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.source.ContinuousFileStoreSource;
-import org.apache.paimon.flink.source.StaticFileStoreSource;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
@@ -52,24 +50,25 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static 
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
+import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
 
-/**
- * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} 
and {@link
- * FixedBucketSink}.
- */
+/** ITCase for partition expire when end input. */
 @ExtendWith(ParameterizedTestExtension.class)
-public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {
+public class FlinkEndInputPartitionExpireITCase extends CatalogITCaseBase {
 
     private static final RowType TABLE_TYPE =
             new RowType(
@@ -81,23 +80,19 @@ public class FlinkEndInputWatermarkITCase extends 
CatalogITCaseBase {
 
     private static final List<RowData> SOURCE_DATA =
             Arrays.asList(
-                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
-                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
-                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
-                    wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
-                    wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
-                    wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
-                    wrap(GenericRowData.of(5, StringData.fromString("p2"), 
1)));
+                    wrap(GenericRowData.of(0, 
StringData.fromString("20240101"), 1)),
+                    wrap(GenericRowData.of(0, 
StringData.fromString("20240101"), 2)),
+                    wrap(GenericRowData.of(0, 
StringData.fromString("20240103"), 1)),
+                    wrap(GenericRowData.of(5, 
StringData.fromString("20240103"), 1)),
+                    wrap(GenericRowData.of(6, 
StringData.fromString("20240105"), 1)));
 
     private static SerializableRowData wrap(RowData row) {
         return new SerializableRowData(row, 
InternalSerializers.create(TABLE_TYPE));
     }
 
-    private static final long END_INPUT_WATERMARK = 11111;
-
     private final StreamExecutionEnvironment env;
 
-    public FlinkEndInputWatermarkITCase() {
+    public FlinkEndInputPartitionExpireITCase() {
         this.env = 
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
     }
 
@@ -106,26 +101,8 @@ public class FlinkEndInputWatermarkITCase extends 
CatalogITCaseBase {
         return Arrays.asList(true, false);
     }
 
-    @Override
-    protected List<String> ddl() {
-        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, 
b INT, c INT)");
-    }
-
-    @TestTemplate
-    public void testEndInputWatermarkBySQL() throws Exception {
-        batchSql(
-                "INSERT INTO T /*+ OPTIONS('end-input.watermark'= '%s') */ 
VALUES (1, 11, 111), (2, 22, 222)",
-                String.valueOf(END_INPUT_WATERMARK));
-
-        FileStoreTable table = paimonTable("T");
-        Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
-        Long waterMark = table.snapshotManager().latestSnapshot().watermark();
-        Assertions.assertNotNull(waterMark);
-        Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
-    }
-
     @TestTemplate
-    public void testEndInputWatermark() throws Exception {
+    public void testEndInputPartitionExpire() throws Exception {
         FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] 
{1, 2});
 
         // write
@@ -148,10 +125,9 @@ public class FlinkEndInputWatermarkITCase extends 
CatalogITCaseBase {
         new FlinkSinkBuilder(table).forRow(input, inputType).build();
         env.execute();
 
-        Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
-        Long waterMark = table.snapshotManager().latestSnapshot().watermark();
-        Assertions.assertNotNull(waterMark);
-        Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
+        Assertions.assertEquals(2, table.snapshotManager().snapshotCount());
+        Assertions.assertEquals(
+                Snapshot.CommitKind.OVERWRITE, 
table.snapshotManager().snapshot(2).commitKind());
     }
 
     private FileStoreTable buildFileStoreTable(int[] partitions, int[] 
primaryKey)
@@ -160,9 +136,10 @@ public class FlinkEndInputWatermarkITCase extends 
CatalogITCaseBase {
         options.set(BUCKET, 3);
         options.set(PATH, getTempDirPath());
         options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
-        options.set(
-                FlinkConnectorOptions.END_INPUT_WATERMARK.key(),
-                String.valueOf(END_INPUT_WATERMARK));
+        options.set(PARTITION_EXPIRATION_TIME, Duration.ofDays(2));
+        options.set(PARTITION_EXPIRATION_CHECK_INTERVAL, Duration.ofHours(1));
+        options.set(PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd");
+        options.set(END_INPUT_CHECK_PARTITION_EXPIRE, true);
 
         Path tablePath = new CoreOptions(options.toMap()).path();
         if (primaryKey.length == 0) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
index 19fd139d8..c85b570dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
@@ -19,10 +19,7 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.sink.FixedBucketSink;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.source.ContinuousFileStoreSource;
-import org.apache.paimon.flink.source.StaticFileStoreSource;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
@@ -64,10 +61,7 @@ import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
 
-/**
- * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} 
and {@link
- * FixedBucketSink}.
- */
+/** ITCase for user define watermark when end input. */
 @ExtendWith(ParameterizedTestExtension.class)
 public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {
 

Reply via email to