This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d976fd062 [flink] Introduce partition mark done with batch (#3507)
d976fd062 is described below
commit d976fd062737989ffffcaeb6a01d8ec1d9cb67bc
Author: wangwj <[email protected]>
AuthorDate: Fri Jun 21 17:13:12 2024 +0800
[flink] Introduce partition mark done with batch (#3507)
---
.../generated/flink_connector_configuration.html | 6 +
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 +
.../org/apache/paimon/flink/sink/Committer.java | 3 +-
.../paimon/flink/sink/CommitterOperator.java | 4 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 6 +-
.../paimon/flink/sink/StoreMultiCommitter.java | 6 +-
.../flink/sink/partition/PartitionMarkDone.java | 123 ++++++--------
.../sink/partition/PartitionMarkDoneTrigger.java | 107 ++++++++++--
.../FlinkBatchJobPartitionMarkdoneITCase.java | 187 +++++++++++++++++++++
.../paimon/flink/sink/CommitterOperatorTest.java | 2 +-
.../partition/PartitionMarkDoneTriggerTest.java | 76 +++++++--
11 files changed, 410 insertions(+), 117 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 3f8a9e523..32dbe6c0f 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,6 +80,12 @@ under the License.
<td>Integer</td>
<td>If the pending snapshot count exceeds the threshold, lookup
operator will refresh the table in sync.</td>
</tr>
+ <tr>
+ <td><h5>partition.end-input-to-done</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether mark the done status to indicate that the data is
ready when end input.</td>
+ </tr>
<tr>
<td><h5>partition.idle-time-to-done</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e5138cd48..4c5d1f762 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -376,6 +376,13 @@ public class FlinkConnectorOptions {
"Both can be configured at the
same time: 'done-partition,success-file'.")
.build());
+ public static final ConfigOption<Boolean>
PARTITION_MARK_DONE_WHEN_END_INPUT =
+ ConfigOptions.key("partition.end-input-to-done")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether mark the done status to indicate that the
data is ready when end input.");
+
public static final ConfigOption<String> CLUSTERING_COLUMNS =
key("sink.clustering.by-columns")
.stringType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 8d08c2471..8d7efc20e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -47,7 +47,8 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
long checkpointId, long watermark, GlobalCommitT t, List<CommitT>
committables);
/** Commits the given {@link GlobalCommitT}. */
- void commit(List<GlobalCommitT> globalCommittables) throws IOException,
InterruptedException;
+ void commit(List<GlobalCommitT> globalCommittables, boolean endInput)
+ throws IOException, InterruptedException;
/**
* Filter out all {@link GlobalCommitT} which have committed, and commit
the remaining {@link
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 1bb087aa9..608e20e4a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -201,13 +201,13 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
List<GlobalCommitT> committables = committables(headMap);
- committer.commit(committables);
+ committer.commit(committables, endInput);
headMap.clear();
if (committables.isEmpty()) {
if (committer.forceCreatingSnapshot()) {
GlobalCommitT commit = toCommittables(checkpointId,
Collections.emptyList());
- committer.commit(Collections.singletonList(commit));
+ committer.commit(Collections.singletonList(commit), endInput);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 922949cce..e4c23f8de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -107,12 +107,12 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
}
@Override
- public void commit(List<ManifestCommittable> committables)
+ public void commit(List<ManifestCommittable> committables, boolean
endInput)
throws IOException, InterruptedException {
commit.commitMultiple(committables, false);
calcNumBytesAndRecordsOut(committables);
if (partitionMarkDone != null) {
- partitionMarkDone.notifyCommittable(committables);
+ partitionMarkDone.notifyCommittable(committables, endInput);
}
}
@@ -120,7 +120,7 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
public int filterAndCommit(List<ManifestCommittable> globalCommittables) {
int committed = commit.filterAndCommitMultiple(globalCommittables);
if (partitionMarkDone != null) {
- partitionMarkDone.notifyCommittable(globalCommittables);
+ partitionMarkDone.notifyCommittable(globalCommittables, false);
}
return committed;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index efa2aefe3..79160650c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -114,7 +114,7 @@ public class StoreMultiCommitter
}
@Override
- public void commit(List<WrappedManifestCommittable> committables)
+ public void commit(List<WrappedManifestCommittable> committables, boolean
endInput)
throws IOException, InterruptedException {
if (committables.isEmpty()) {
return;
@@ -130,13 +130,13 @@ public class StoreMultiCommitter
List<ManifestCommittable> committableList =
committableMap.get(entry.getKey());
StoreCommitter committer = entry.getValue();
if (committableList != null) {
- committer.commit(committableList);
+ committer.commit(committableList, endInput);
} else {
// try best to commit empty snapshot, but tableCommitters may
not contain all tables
if (committer.forceCreatingSnapshot()) {
ManifestCommittable combine =
committer.combine(checkpointId, watermark,
Collections.emptyList());
- committer.commit(Collections.singletonList(combine));
+ committer.commit(Collections.singletonList(combine),
endInput);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index c712682ca..b2794885c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -23,16 +23,11 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.InternalRowPartitionComputer;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.table.utils.PartitionPathUtils;
import javax.annotation.Nullable;
@@ -40,9 +35,7 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -51,18 +44,13 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_TIME_INTERVAL;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Mark partition done. */
public class PartitionMarkDone implements Closeable {
- private static final ListStateDescriptor<List<String>>
PENDING_PARTITIONS_STATE_DESC =
- new ListStateDescriptor<>(
- "mark-done-pending-partitions",
- new ListSerializer<>(StringSerializer.INSTANCE));
-
private final InternalRowPartitionComputer partitionComputer;
private final PartitionMarkDoneTrigger trigger;
private final List<PartitionMarkDoneAction> actions;
@@ -74,54 +62,24 @@ public class PartitionMarkDone implements Closeable {
OperatorStateStore stateStore,
FileStoreTable table)
throws Exception {
- if (!isStreaming) {
- return null;
- }
-
- List<String> partitionKeys = table.partitionKeys();
- if (partitionKeys.isEmpty()) {
- return null;
- }
-
CoreOptions coreOptions = table.coreOptions();
Options options = coreOptions.toConfiguration();
- Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE);
- if (idleToDone == null) {
+ if (closePartitionMarkDone(isStreaming, table, options)) {
return null;
}
- MetastoreClient.Factory metastoreClientFactory =
- table.catalogEnvironment().metastoreClientFactory();
-
- String partitionMarkDownAction =
options.get(PARTITION_MARK_DONE_ACTION);
- if (partitionMarkDownAction.contains("done-partition")) {
- checkNotNull(
- metastoreClientFactory,
- "Cannot mark done partition for table without metastore.");
- checkArgument(
- coreOptions.partitionedTableInMetastore(),
- "Table should enable %s",
- METASTORE_PARTITIONED_TABLE.key());
- }
-
InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
coreOptions.partitionDefaultName(),
table.schema().logicalPartitionType(),
- partitionKeys.toArray(new String[0]));
+ table.partitionKeys().toArray(new String[0]));
PartitionMarkDoneTrigger trigger =
- new PartitionMarkDoneTrigger(
- new PartitionMarkDoneTriggerState(isRestored,
stateStore),
- new PartitionTimeExtractor(
- coreOptions.partitionTimestampPattern(),
- coreOptions.partitionTimestampFormatter()),
- options.get(PARTITION_TIME_INTERVAL),
- idleToDone);
+ PartitionMarkDoneTrigger.create(coreOptions, isRestored,
stateStore);
List<PartitionMarkDoneAction> actions =
- Arrays.asList(partitionMarkDownAction.split(",")).stream()
+
Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
.map(
action -> {
switch (action) {
@@ -130,7 +88,8 @@ public class PartitionMarkDone implements Closeable {
table.fileIO(),
table.location());
case "done-partition":
return new AddDonePartitionAction(
-
metastoreClientFactory.create());
+
checkMetastoreAndCreateMetastoreClient(
+ table,
coreOptions, options));
default:
throw new
UnsupportedOperationException(action);
}
@@ -140,6 +99,44 @@ public class PartitionMarkDone implements Closeable {
return new PartitionMarkDone(partitionComputer, trigger, actions);
}
+ private static boolean closePartitionMarkDone(
+ boolean isStreaming, FileStoreTable table, Options options) {
+ boolean partitionMarkDoneWhenEndInput =
options.get(PARTITION_MARK_DONE_WHEN_END_INPUT);
+ if (!isStreaming && !partitionMarkDoneWhenEndInput) {
+ return true;
+ }
+
+ Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE);
+ if (isStreaming && idleToDone == null) {
+ return true;
+ }
+
+ List<String> partitionKeys = table.partitionKeys();
+ if (partitionKeys.isEmpty()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static MetastoreClient checkMetastoreAndCreateMetastoreClient(
+ FileStoreTable table, CoreOptions coreOptions, Options options) {
+ MetastoreClient.Factory metastoreClientFactory =
+ table.catalogEnvironment().metastoreClientFactory();
+
+ if
(options.get(PARTITION_MARK_DONE_ACTION).contains("done-partition")) {
+ checkNotNull(
+ metastoreClientFactory,
+ "Cannot mark done partition for table without metastore.");
+ checkArgument(
+ coreOptions.partitionedTableInMetastore(),
+ "Table should enable %s",
+ METASTORE_PARTITIONED_TABLE.key());
+ }
+
+ return metastoreClientFactory.create();
+ }
+
public PartitionMarkDone(
InternalRowPartitionComputer partitionComputer,
PartitionMarkDoneTrigger trigger,
@@ -149,7 +146,7 @@ public class PartitionMarkDone implements Closeable {
this.actions = actions;
}
- public void notifyCommittable(List<ManifestCommittable> committables) {
+ public void notifyCommittable(List<ManifestCommittable> committables,
boolean endInput) {
Set<BinaryRow> partitions = new HashSet<>();
for (ManifestCommittable committable : committables) {
committable.fileCommittables().stream()
@@ -162,7 +159,7 @@ public class PartitionMarkDone implements Closeable {
.map(PartitionPathUtils::generatePartitionPath)
.forEach(trigger::notifyPartition);
- for (String partition : trigger.donePartitions()) {
+ for (String partition : trigger.donePartitions(endInput)) {
try {
for (PartitionMarkDoneAction action : actions) {
action.markDone(partition);
@@ -183,30 +180,4 @@ public class PartitionMarkDone implements Closeable {
action.close();
}
}
-
- private static class PartitionMarkDoneTriggerState implements
PartitionMarkDoneTrigger.State {
-
- private final boolean isRestored;
- private final ListState<List<String>> pendingPartitionsState;
-
- private PartitionMarkDoneTriggerState(boolean isRestored,
OperatorStateStore stateStore)
- throws Exception {
- this.isRestored = isRestored;
- this.pendingPartitionsState =
stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
- }
-
- @Override
- public List<String> restore() throws Exception {
- List<String> pendingPartitions = new ArrayList<>();
- if (isRestored) {
-
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
- }
- return pendingPartitions;
- }
-
- @Override
- public void update(List<String> partitions) throws Exception {
-
pendingPartitionsState.update(Collections.singletonList(partitions));
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index 9309938e2..d3c3cc304 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -18,50 +18,81 @@
package org.apache.paimon.flink.sink.partition;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.utils.StringUtils;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_TIME_INTERVAL;
import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
-/** Trigger to mark partitions done. */
+/** Trigger to mark partitions done with streaming job. */
public class PartitionMarkDoneTrigger {
+ private static final ListStateDescriptor<List<String>>
PENDING_PARTITIONS_STATE_DESC =
+ new ListStateDescriptor<>(
+ "mark-done-pending-partitions",
+ new ListSerializer<>(StringSerializer.INSTANCE));
+
private final State state;
private final PartitionTimeExtractor timeExtractor;
- private final long timeInterval;
- private final long idleTime;
+ private long timeInterval;
+ private long idleTime;
+ private final boolean partitionMarkDoneWhenEndInput;
private final Map<String, Long> pendingPartitions;
public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInterval,
- Duration idleTime)
+ Duration idleTime,
+ boolean partitionMarkDoneWhenEndInput)
throws Exception {
- this(state, timeExtractor, timeInterval, idleTime,
System.currentTimeMillis());
+ this(
+ state,
+ timeExtractor,
+ timeInterval,
+ idleTime,
+ System.currentTimeMillis(),
+ partitionMarkDoneWhenEndInput);
}
- PartitionMarkDoneTrigger(
+ public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
Duration timeInterval,
Duration idleTime,
- long currentTimeMillis)
+ long currentTimeMillis,
+ boolean partitionMarkDoneWhenEndInput)
throws Exception {
+ this.pendingPartitions = new HashMap<>();
this.state = state;
this.timeExtractor = timeExtractor;
- this.timeInterval = timeInterval.toMillis();
- this.idleTime = idleTime.toMillis();
- this.pendingPartitions = new HashMap<>();
+ if (timeInterval != null) {
+ this.timeInterval = timeInterval.toMillis();
+ }
+ if (idleTime != null) {
+ this.idleTime = idleTime.toMillis();
+ }
+ this.partitionMarkDoneWhenEndInput = partitionMarkDoneWhenEndInput;
state.restore().forEach(p -> pendingPartitions.put(p,
currentTimeMillis));
}
@@ -69,17 +100,23 @@ public class PartitionMarkDoneTrigger {
notifyPartition(partition, System.currentTimeMillis());
}
- void notifyPartition(String partition, long currentTimeMillis) {
+ public void notifyPartition(String partition, long currentTimeMillis) {
if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
this.pendingPartitions.put(partition, currentTimeMillis);
}
}
- public List<String> donePartitions() {
- return donePartitions(System.currentTimeMillis());
+ public List<String> donePartitions(boolean endInput) {
+ return donePartitions(endInput, System.currentTimeMillis());
}
- public List<String> donePartitions(long currentTimeMillis) {
+ public List<String> donePartitions(boolean endInput, long
currentTimeMillis) {
+ if (endInput) {
+ return partitionMarkDoneWhenEndInput
+ ? new ArrayList<>(pendingPartitions.keySet())
+ : Collections.emptyList();
+ }
+
List<String> needDone = new ArrayList<>();
Iterator<Map.Entry<String, Long>> iter =
pendingPartitions.entrySet().iterator();
while (iter.hasNext()) {
@@ -110,9 +147,49 @@ public class PartitionMarkDoneTrigger {
/** State to store partitions. */
public interface State {
-
List<String> restore() throws Exception;
void update(List<String> partitions) throws Exception;
}
+
+ /** State to store partitions with streaming job. */
+ private static class PartitionMarkDoneTriggerState implements State {
+
+ private final boolean isRestored;
+ private final ListState<List<String>> pendingPartitionsState;
+
+ public PartitionMarkDoneTriggerState(boolean isRestored,
OperatorStateStore stateStore)
+ throws Exception {
+ this.isRestored = isRestored;
+ this.pendingPartitionsState =
stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
+ }
+
+ @Override
+ public List<String> restore() throws Exception {
+ List<String> pendingPartitions = new ArrayList<>();
+ if (isRestored) {
+
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
+ }
+ return pendingPartitions;
+ }
+
+ @Override
+ public void update(List<String> partitions) throws Exception {
+
pendingPartitionsState.update(Collections.singletonList(partitions));
+ }
+ }
+
+ public static PartitionMarkDoneTrigger create(
+ CoreOptions coreOptions, boolean isRestored, OperatorStateStore
stateStore)
+ throws Exception {
+ Options options = coreOptions.toConfiguration();
+ return new PartitionMarkDoneTrigger(
+ new
PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState(isRestored, stateStore),
+ new PartitionTimeExtractor(
+ coreOptions.partitionTimestampPattern(),
+ coreOptions.partitionTimestampFormatter()),
+ options.get(PARTITION_TIME_INTERVAL),
+ options.get(PARTITION_IDLE_TIME_TO_DONE),
+ options.get(PARTITION_MARK_DONE_WHEN_END_INPUT));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
new file mode 100644
index 000000000..e78e5143a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.partition.SuccessFile;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
+import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partition mark done with flink batch job. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class FlinkBatchJobPartitionMarkdoneITCase extends CatalogITCaseBase {
+
+ private static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("v", new IntType()),
+ new RowType.RowField("p", new VarCharType(10)),
+ // rename key
+ new RowType.RowField("_k", new IntType())));
+
+ private static final List<RowData> SOURCE_DATA =
+ Arrays.asList(
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
+ wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
+ wrap(GenericRowData.of(5, StringData.fromString("p2"),
1)));
+
+ private static SerializableRowData wrap(RowData row) {
+ return new SerializableRowData(row,
InternalSerializers.create(TABLE_TYPE));
+ }
+
+ private final StreamExecutionEnvironment env;
+
+ public FlinkBatchJobPartitionMarkdoneITCase() {
+ this.env =
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
+ }
+
+ @Parameters(name = "isBatch-{0}")
+ public static List<Boolean> getVarSeg() {
+ return Arrays.asList(true, false);
+ }
+
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T ("
+ + "v INT, "
+ + "p STRING, "
+ + "_k INT, "
+ + "PRIMARY KEY (_k) NOT ENFORCED"
+ + ") PARTITIONED BY (p) WITH ()");
+ }
+
+ public void validateResult(FileStoreTable table) throws Exception {
+ LocalFileIO fileIO = new LocalFileIO();
+ Path successPath1 = new Path(table.location(), "p=p1/_SUCCESS");
+ SuccessFile successFile1 = SuccessFile.safelyFromPath(fileIO,
successPath1);
+ assertThat(successFile1).isNotNull();
+
+ Path successPath2 = new Path(table.location(), "p=p2/_SUCCESS");
+ SuccessFile successFile2 = SuccessFile.safelyFromPath(fileIO,
successPath2);
+ assertThat(successFile2).isNotNull();
+ }
+
+ @TestTemplate
+ public void testFlinkBatchJobPartitionMarkdoneBySQL() throws Exception {
+ batchSql(
+ "INSERT INTO T /*+ OPTIONS('partition.end-input-to-done'=
'true') */ VALUES (0, 'p1', 1), (0, 'p2', 2)");
+
+ validateResult(paimonTable("T"));
+ }
+
+ @TestTemplate
+ public void testFlinkBatchJobPartitionMarkdone() throws Exception {
+ FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[]
{1, 2});
+
+ // write
+ DataStreamSource<RowData> source =
+ env.fromCollection(SOURCE_DATA,
InternalTypeInfo.of(TABLE_TYPE));
+ DataStream<Row> input =
+ source.map(
+ (MapFunction<RowData, Row>)
+ r ->
+ Row.of(
+ r.getInt(0),
+
r.getString(1).toString(),
+ r.getInt(2)))
+ .setParallelism(source.getParallelism());
+ DataType inputType =
+ DataTypes.ROW(
+ DataTypes.FIELD("v", DataTypes.INT()),
+ DataTypes.FIELD("p", DataTypes.STRING()),
+ DataTypes.FIELD("_k", DataTypes.INT()));
+ new FlinkSinkBuilder(table).forRow(input, inputType).build();
+ env.execute();
+
+ validateResult(table);
+ }
+
+ private FileStoreTable buildFileStoreTable(int[] partitions, int[]
primaryKey)
+ throws Exception {
+ Options options = new Options();
+ options.set(BUCKET, 3);
+ options.set(PATH, getTempDirPath());
+ options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
+
options.set(FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(),
"true");
+
+ Path tablePath = new CoreOptions(options.toMap()).path();
+ if (primaryKey.length == 0) {
+ options.set(BUCKET_KEY, "_k");
+ }
+ Schema schema =
+ new Schema(
+ toDataType(TABLE_TYPE).getFields(),
+ Arrays.stream(partitions)
+ .mapToObj(i ->
TABLE_TYPE.getFieldNames().get(i))
+ .collect(Collectors.toList()),
+ Arrays.stream(primaryKey)
+ .mapToObj(i ->
TABLE_TYPE.getFieldNames().get(i))
+ .collect(Collectors.toList()),
+ options.toMap(),
+ "");
+ return retryArtificialException(
+ () -> {
+ new SchemaManager(LocalFileIO.create(),
tablePath).createTable(schema);
+ return FileStoreTableFactory.create(LocalFileIO.create(),
options);
+ });
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 3f1281abc..e7e955554 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -563,7 +563,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
StoreCommitter committer =
new StoreCommitter(
table, commit, Committer.createContext("",
metricGroup, true, false, null));
- committer.commit(Collections.singletonList(manifestCommittable));
+ committer.commit(Collections.singletonList(manifestCommittable),
false);
CommitterMetrics metrics = committer.getCommitterMetrics();
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(293);
assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
index fd66214e3..f2f6f47c5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.partition;
import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -34,10 +35,17 @@ import static org.assertj.core.api.Assertions.assertThat;
class PartitionMarkDoneTriggerTest {
- @Test
- public void test() throws Exception {
- List<String> pendingPartitions = new ArrayList<>();
- PartitionMarkDoneTrigger.State state =
+ private static final Duration timeInterval = Duration.ofDays(1);
+ private static final Duration idleTime = Duration.ofMinutes(15);
+
+ private List<String> pendingPartitions;
+ private PartitionMarkDoneTrigger.State state;
+ private PartitionTimeExtractor extractor;
+
+ @BeforeEach
+ public void before() throws Exception {
+ this.pendingPartitions = new ArrayList<>();
+ this.state =
new PartitionMarkDoneTrigger.State() {
@Override
public List<String> restore() {
@@ -50,17 +58,23 @@ class PartitionMarkDoneTriggerTest {
pendingPartitions.addAll(partitions);
}
};
+ this.extractor = new PartitionTimeExtractor("$dt", "yyyy-MM-dd");
+ }
- PartitionTimeExtractor extractor = new PartitionTimeExtractor("$dt",
"yyyy-MM-dd");
- Duration timeInterval = Duration.ofDays(1);
- Duration idleTime = Duration.ofMinutes(15);
+ @Test
+ public void testWithoutEndInput() throws Exception {
PartitionMarkDoneTrigger trigger =
new PartitionMarkDoneTrigger(
- state, extractor, timeInterval, idleTime,
toEpochMillis("2024-02-01"));
+ state,
+ extractor,
+ timeInterval,
+ idleTime,
+ toEpochMillis("2024-02-01"),
+ false);
// test not reach partition end + idle time
trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01"));
- List<String> partitions =
trigger.donePartitions(toEpochMillis("2024-02-03"));
+ List<String> partitions = trigger.donePartitions(false,
toEpochMillis("2024-02-03"));
assertThat(partitions).isEmpty();
// test state
@@ -69,9 +83,12 @@ class PartitionMarkDoneTriggerTest {
assertThat(pendingPartitions).containsOnly("dt=2024-02-02");
// test trigger
- partitions = trigger.donePartitions(toEpochMillis("2024-02-03") +
idleTime.toMillis());
+ partitions =
+ trigger.donePartitions(false, toEpochMillis("2024-02-03") +
idleTime.toMillis());
assertThat(partitions).isEmpty();
- partitions = trigger.donePartitions(toEpochMillis("2024-02-03") +
idleTime.toMillis() + 1);
+ partitions =
+ trigger.donePartitions(
+ false, toEpochMillis("2024-02-03") +
idleTime.toMillis() + 1);
assertThat(partitions).containsOnly("dt=2024-02-02");
// test state
@@ -81,23 +98,50 @@ class PartitionMarkDoneTriggerTest {
// test refresh
trigger.notifyPartition("dt=2024-02-03", toEpochMillis("2024-02-03"));
trigger.notifyPartition("dt=2024-02-03", toEpochMillis("2024-02-04") +
idleTime.toMillis());
- partitions = trigger.donePartitions(toEpochMillis("2024-02-04") +
idleTime.toMillis() + 1);
+ partitions =
+ trigger.donePartitions(
+ false, toEpochMillis("2024-02-04") +
idleTime.toMillis() + 1);
assertThat(partitions).isEmpty();
partitions =
- trigger.donePartitions(toEpochMillis("2024-02-04") + 2 *
idleTime.toMillis() + 1);
+ trigger.donePartitions(
+ false, toEpochMillis("2024-02-04") + 2 *
idleTime.toMillis() + 1);
assertThat(partitions).containsOnly("dt=2024-02-03");
// test restore
pendingPartitions.add("dt=2024-02-04");
trigger =
new PartitionMarkDoneTrigger(
- state, extractor, timeInterval, idleTime,
toEpochMillis("2024-02-06"));
- partitions = trigger.donePartitions(toEpochMillis("2024-02-06"));
+ state,
+ extractor,
+ timeInterval,
+ idleTime,
+ toEpochMillis("2024-02-06"),
+ false);
+ partitions = trigger.donePartitions(false,
toEpochMillis("2024-02-06"));
assertThat(partitions).isEmpty();
- partitions = trigger.donePartitions(toEpochMillis("2024-02-06") +
idleTime.toMillis() + 1);
+ partitions =
+ trigger.donePartitions(
+ false, toEpochMillis("2024-02-06") +
idleTime.toMillis() + 1);
assertThat(partitions).containsOnly("dt=2024-02-04");
}
+ @Test
+ public void testWithEndInput() throws Exception {
+ PartitionMarkDoneTrigger trigger =
+ new PartitionMarkDoneTrigger(
+ state,
+ extractor,
+ timeInterval,
+ idleTime,
+ toEpochMillis("2024-02-01"),
+ true);
+
+ // test not reach partition end + idle time
+ trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01"));
+ List<String> partitions = trigger.donePartitions(true,
toEpochMillis("2024-02-03"));
+ assertThat(partitions).containsOnly("dt=2024-02-02");
+ }
+
private long toEpochMillis(String dt) {
return LocalDateTime.of(LocalDate.parse(dt), LocalTime.MIN)
.atZone(ZoneId.systemDefault())