This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9ec3711c9 [core] support check partition expire when batch mode or
bounded stream (#3933)
9ec3711c9 is described below
commit 9ec3711c91dec792fc9afb4c4eefaf964bac9572
Author: wangwj <[email protected]>
AuthorDate: Mon Aug 12 14:30:21 2024 +0800
[core] support check partition expire when batch mode or bounded stream
(#3933)
---
.../shortcodes/generated/core_configuration.html | 18 ++++--
.../main/java/org/apache/paimon/CoreOptions.java | 11 ++++
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../apache/paimon/operation/PartitionExpire.java | 21 +++++--
.../paimon/table/AbstractFileStoreTable.java | 4 +-
...ava => FlinkEndInputPartitionExpireITCase.java} | 67 +++++++---------------
.../paimon/flink/FlinkEndInputWatermarkITCase.java | 8 +--
7 files changed, 67 insertions(+), 65 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b0497b345..ee36a01de 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -242,6 +242,12 @@ under the License.
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a
partitioned table with dynamic partition columns. Works only when the table has
partition keys.</td>
</tr>
+ <tr>
+ <td><h5>end-input.check-partition-expire</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Optional endInput check partition expire used in case of batch
mode or bounded stream.</td>
+ </tr>
<tr>
<td><h5>fields.default-aggregate-function</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -302,6 +308,12 @@ under the License.
<td>Map</td>
<td>Define different file format for different level, you can add
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file
format for level is not provided, the default format which set by `file.format`
will be used.</td>
</tr>
+ <tr>
+ <td><h5>force-lookup</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to force the use of lookup for compaction.</td>
+ </tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -345,12 +357,6 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td>Integer</td>
<td>The maximal fan-in for external merge sort. It limits the
number of file handles. If it is too small, may cause intermediate merging. But
if it is too large, it will cause too many files opened at the same time,
consume memory and lead to random reading.</td>
</tr>
- <tr>
- <td><h5>force-lookup</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to force the use of lookup for compaction.</td>
- </tr>
<tr>
<td><h5>lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c48527d2a..961062209 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -964,6 +964,13 @@ public class CoreOptions implements Serializable {
"Read incremental changes between start timestamp
(exclusive) and end timestamp, "
+ "for example, 't1,t2' means changes
between timestamp t1 and timestamp t2.");
+ public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE
=
+ key("end-input.check-partition-expire")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional endInput check partition expire used in
case of batch mode or bounded stream.");
+
public static final String STATS_MODE_SUFFIX = "stats-mode";
public static final ConfigOption<String> METADATA_STATS_MODE =
@@ -1542,6 +1549,10 @@ public class CoreOptions implements Serializable {
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
}
+ public boolean endInputCheckPartitionExpire() {
+ return options.get(END_INPUT_CHECK_PARTITION_EXPIRE);
+ }
+
public ExpireConfig expireConfig() {
return ExpireConfig.builder()
.snapshotRetainMax(snapshotNumRetainMax())
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5120db295..21fb87562 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -288,7 +288,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
PartitionExpireStrategy.createPartitionExpireStrategy(options,
partitionType()),
newScan(),
newCommit(commitUser),
- metastoreClient);
+ metastoreClient,
+ options.endInputCheckPartitionExpire());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index b769fc899..2f5ca780c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -45,10 +45,9 @@ public class PartitionExpire {
private final FileStoreScan scan;
private final FileStoreCommit commit;
private final MetastoreClient metastoreClient;
-
private LocalDateTime lastCheck;
-
private final PartitionExpireStrategy strategy;
+ private final boolean endInputCheckPartitionExpire;
public PartitionExpire(
Duration expirationTime,
@@ -56,7 +55,8 @@ public class PartitionExpire {
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
- @Nullable MetastoreClient metastoreClient) {
+ @Nullable MetastoreClient metastoreClient,
+ boolean endInputCheckPartitionExpire) {
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.strategy = strategy;
@@ -64,6 +64,17 @@ public class PartitionExpire {
this.commit = commit;
this.metastoreClient = metastoreClient;
this.lastCheck = LocalDateTime.now();
+ this.endInputCheckPartitionExpire = endInputCheckPartitionExpire;
+ }
+
+ public PartitionExpire(
+ Duration expirationTime,
+ Duration checkInterval,
+ PartitionExpireStrategy strategy,
+ FileStoreScan scan,
+ FileStoreCommit commit,
+ @Nullable MetastoreClient metastoreClient) {
+ this(expirationTime, checkInterval, strategy, scan, commit,
metastoreClient, false);
}
public PartitionExpire withLock(Lock lock) {
@@ -82,7 +93,9 @@ public class PartitionExpire {
@VisibleForTesting
List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier)
{
- if (checkInterval.isZero() ||
now.isAfter(lastCheck.plus(checkInterval))) {
+ if (checkInterval.isZero()
+ || now.isAfter(lastCheck.plus(checkInterval))
+ || (endInputCheckPartitionExpire && Long.MAX_VALUE ==
commitIdentifier)) {
List<Map<String, String>> expired =
doExpire(now.minus(expirationTime), commitIdentifier);
lastCheck = now;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 04f5f6dd6..a6228ab27 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -381,9 +381,9 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
- coreOptions().snapshotExpireExecutionMode(),
+ options.snapshotExpireExecutionMode(),
name(),
- coreOptions().forceCreatingSnapshot());
+ options.forceCreatingSnapshot());
}
private List<CommitCallback> createCommitCallbacks(String commitUser) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java
similarity index 77%
copy from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
copy to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java
index 19fd139d8..63eebcd7d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputPartitionExpireITCase.java
@@ -19,10 +19,8 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.sink.FixedBucketSink;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.source.ContinuousFileStoreSource;
-import org.apache.paimon.flink.source.StaticFileStoreSource;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
@@ -52,24 +50,25 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.time.Duration;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
+import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
-/**
- * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource}
and {@link
- * FixedBucketSink}.
- */
+/** ITCase for partition expire when end input. */
@ExtendWith(ParameterizedTestExtension.class)
-public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {
+public class FlinkEndInputPartitionExpireITCase extends CatalogITCaseBase {
private static final RowType TABLE_TYPE =
new RowType(
@@ -81,23 +80,19 @@ public class FlinkEndInputWatermarkITCase extends
CatalogITCaseBase {
private static final List<RowData> SOURCE_DATA =
Arrays.asList(
- wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
- wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
- wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
- wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
- wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
- wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
- wrap(GenericRowData.of(5, StringData.fromString("p2"),
1)));
+ wrap(GenericRowData.of(0,
StringData.fromString("20240101"), 1)),
+ wrap(GenericRowData.of(0,
StringData.fromString("20240101"), 2)),
+ wrap(GenericRowData.of(0,
StringData.fromString("20240103"), 1)),
+ wrap(GenericRowData.of(5,
StringData.fromString("20240103"), 1)),
+ wrap(GenericRowData.of(6,
StringData.fromString("20240105"), 1)));
private static SerializableRowData wrap(RowData row) {
return new SerializableRowData(row,
InternalSerializers.create(TABLE_TYPE));
}
- private static final long END_INPUT_WATERMARK = 11111;
-
private final StreamExecutionEnvironment env;
- public FlinkEndInputWatermarkITCase() {
+ public FlinkEndInputPartitionExpireITCase() {
this.env =
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
}
@@ -106,26 +101,8 @@ public class FlinkEndInputWatermarkITCase extends
CatalogITCaseBase {
return Arrays.asList(true, false);
}
- @Override
- protected List<String> ddl() {
- return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT,
b INT, c INT)");
- }
-
- @TestTemplate
- public void testEndInputWatermarkBySQL() throws Exception {
- batchSql(
- "INSERT INTO T /*+ OPTIONS('end-input.watermark'= '%s') */
VALUES (1, 11, 111), (2, 22, 222)",
- String.valueOf(END_INPUT_WATERMARK));
-
- FileStoreTable table = paimonTable("T");
- Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
- Long waterMark = table.snapshotManager().latestSnapshot().watermark();
- Assertions.assertNotNull(waterMark);
- Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
- }
-
@TestTemplate
- public void testEndInputWatermark() throws Exception {
+ public void testEndInputPartitionExpire() throws Exception {
FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[]
{1, 2});
// write
@@ -148,10 +125,9 @@ public class FlinkEndInputWatermarkITCase extends
CatalogITCaseBase {
new FlinkSinkBuilder(table).forRow(input, inputType).build();
env.execute();
- Assertions.assertEquals(1, table.snapshotManager().snapshotCount());
- Long waterMark = table.snapshotManager().latestSnapshot().watermark();
- Assertions.assertNotNull(waterMark);
- Assertions.assertEquals(END_INPUT_WATERMARK, waterMark);
+ Assertions.assertEquals(2, table.snapshotManager().snapshotCount());
+ Assertions.assertEquals(
+ Snapshot.CommitKind.OVERWRITE,
table.snapshotManager().snapshot(2).commitKind());
}
private FileStoreTable buildFileStoreTable(int[] partitions, int[]
primaryKey)
@@ -160,9 +136,10 @@ public class FlinkEndInputWatermarkITCase extends
CatalogITCaseBase {
options.set(BUCKET, 3);
options.set(PATH, getTempDirPath());
options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
- options.set(
- FlinkConnectorOptions.END_INPUT_WATERMARK.key(),
- String.valueOf(END_INPUT_WATERMARK));
+ options.set(PARTITION_EXPIRATION_TIME, Duration.ofDays(2));
+ options.set(PARTITION_EXPIRATION_CHECK_INTERVAL, Duration.ofHours(1));
+ options.set(PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd");
+ options.set(END_INPUT_CHECK_PARTITION_EXPIRE, true);
Path tablePath = new CoreOptions(options.toMap()).path();
if (primaryKey.length == 0) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
index 19fd139d8..c85b570dd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkEndInputWatermarkITCase.java
@@ -19,10 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.source.ContinuousFileStoreSource;
-import org.apache.paimon.flink.source.StaticFileStoreSource;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
@@ -64,10 +61,7 @@ import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
-/**
- * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource}
and {@link
- * FixedBucketSink}.
- */
+/** ITCase for user define watermark when end input. */
@ExtendWith(ParameterizedTestExtension.class)
public class FlinkEndInputWatermarkITCase extends CatalogITCaseBase {