This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d976fd062 [flink] Introduce partition mark done with batch (#3507)
d976fd062 is described below

commit d976fd062737989ffffcaeb6a01d8ec1d9cb67bc
Author: wangwj <[email protected]>
AuthorDate: Fri Jun 21 17:13:12 2024 +0800

    [flink] Introduce partition mark done with batch (#3507)
---
 .../generated/flink_connector_configuration.html   |   6 +
 .../apache/paimon/flink/FlinkConnectorOptions.java |   7 +
 .../org/apache/paimon/flink/sink/Committer.java    |   3 +-
 .../paimon/flink/sink/CommitterOperator.java       |   4 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   |   6 +-
 .../paimon/flink/sink/StoreMultiCommitter.java     |   6 +-
 .../flink/sink/partition/PartitionMarkDone.java    | 123 ++++++--------
 .../sink/partition/PartitionMarkDoneTrigger.java   | 107 ++++++++++--
 .../FlinkBatchJobPartitionMarkdoneITCase.java      | 187 +++++++++++++++++++++
 .../paimon/flink/sink/CommitterOperatorTest.java   |   2 +-
 .../partition/PartitionMarkDoneTriggerTest.java    |  76 +++++++--
 11 files changed, 410 insertions(+), 117 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 3f8a9e523..32dbe6c0f 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,6 +80,12 @@ under the License.
             <td>Integer</td>
             <td>If the pending snapshot count exceeds the threshold, lookup 
operator will refresh the table in sync.</td>
         </tr>
+        <tr>
+            <td><h5>partition.end-input-to-done</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether mark the done status to indicate that the data is 
ready when end input.</td>
+        </tr>
         <tr>
             <td><h5>partition.idle-time-to-done</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e5138cd48..4c5d1f762 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -376,6 +376,13 @@ public class FlinkConnectorOptions {
                                             "Both can be configured at the 
same time: 'done-partition,success-file'.")
                                     .build());
 
+    public static final ConfigOption<Boolean> 
PARTITION_MARK_DONE_WHEN_END_INPUT =
+            ConfigOptions.key("partition.end-input-to-done")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether mark the done status to indicate that the 
data is ready when end input.");
+
     public static final ConfigOption<String> CLUSTERING_COLUMNS =
             key("sink.clustering.by-columns")
                     .stringType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 8d08c2471..8d7efc20e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -47,7 +47,8 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
             long checkpointId, long watermark, GlobalCommitT t, List<CommitT> 
committables);
 
     /** Commits the given {@link GlobalCommitT}. */
-    void commit(List<GlobalCommitT> globalCommittables) throws IOException, 
InterruptedException;
+    void commit(List<GlobalCommitT> globalCommittables, boolean endInput)
+            throws IOException, InterruptedException;
 
     /**
      * Filter out all {@link GlobalCommitT} which have committed, and commit 
the remaining {@link
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 1bb087aa9..608e20e4a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -201,13 +201,13 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
         NavigableMap<Long, GlobalCommitT> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
         List<GlobalCommitT> committables = committables(headMap);
-        committer.commit(committables);
+        committer.commit(committables, endInput);
         headMap.clear();
 
         if (committables.isEmpty()) {
             if (committer.forceCreatingSnapshot()) {
                 GlobalCommitT commit = toCommittables(checkpointId, 
Collections.emptyList());
-                committer.commit(Collections.singletonList(commit));
+                committer.commit(Collections.singletonList(commit), endInput);
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 922949cce..e4c23f8de 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -107,12 +107,12 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     }
 
     @Override
-    public void commit(List<ManifestCommittable> committables)
+    public void commit(List<ManifestCommittable> committables, boolean 
endInput)
             throws IOException, InterruptedException {
         commit.commitMultiple(committables, false);
         calcNumBytesAndRecordsOut(committables);
         if (partitionMarkDone != null) {
-            partitionMarkDone.notifyCommittable(committables);
+            partitionMarkDone.notifyCommittable(committables, endInput);
         }
     }
 
@@ -120,7 +120,7 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     public int filterAndCommit(List<ManifestCommittable> globalCommittables) {
         int committed = commit.filterAndCommitMultiple(globalCommittables);
         if (partitionMarkDone != null) {
-            partitionMarkDone.notifyCommittable(globalCommittables);
+            partitionMarkDone.notifyCommittable(globalCommittables, false);
         }
         return committed;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index efa2aefe3..79160650c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -114,7 +114,7 @@ public class StoreMultiCommitter
     }
 
     @Override
-    public void commit(List<WrappedManifestCommittable> committables)
+    public void commit(List<WrappedManifestCommittable> committables, boolean 
endInput)
             throws IOException, InterruptedException {
         if (committables.isEmpty()) {
             return;
@@ -130,13 +130,13 @@ public class StoreMultiCommitter
             List<ManifestCommittable> committableList = 
committableMap.get(entry.getKey());
             StoreCommitter committer = entry.getValue();
             if (committableList != null) {
-                committer.commit(committableList);
+                committer.commit(committableList, endInput);
             } else {
                 // try best to commit empty snapshot, but tableCommitters may 
not contain all tables
                 if (committer.forceCreatingSnapshot()) {
                     ManifestCommittable combine =
                             committer.combine(checkpointId, watermark, 
Collections.emptyList());
-                    committer.commit(Collections.singletonList(combine));
+                    committer.commit(Collections.singletonList(combine), 
endInput);
                 }
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index c712682ca..b2794885c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -23,16 +23,11 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.utils.PartitionPathUtils;
 
 import javax.annotation.Nullable;
@@ -40,9 +35,7 @@ import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -51,18 +44,13 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_TIME_INTERVAL;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Mark partition done. */
 public class PartitionMarkDone implements Closeable {
 
-    private static final ListStateDescriptor<List<String>> 
PENDING_PARTITIONS_STATE_DESC =
-            new ListStateDescriptor<>(
-                    "mark-done-pending-partitions",
-                    new ListSerializer<>(StringSerializer.INSTANCE));
-
     private final InternalRowPartitionComputer partitionComputer;
     private final PartitionMarkDoneTrigger trigger;
     private final List<PartitionMarkDoneAction> actions;
@@ -74,54 +62,24 @@ public class PartitionMarkDone implements Closeable {
             OperatorStateStore stateStore,
             FileStoreTable table)
             throws Exception {
-        if (!isStreaming) {
-            return null;
-        }
-
-        List<String> partitionKeys = table.partitionKeys();
-        if (partitionKeys.isEmpty()) {
-            return null;
-        }
-
         CoreOptions coreOptions = table.coreOptions();
         Options options = coreOptions.toConfiguration();
 
-        Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE);
-        if (idleToDone == null) {
+        if (closePartitionMarkDone(isStreaming, table, options)) {
             return null;
         }
 
-        MetastoreClient.Factory metastoreClientFactory =
-                table.catalogEnvironment().metastoreClientFactory();
-
-        String partitionMarkDownAction = 
options.get(PARTITION_MARK_DONE_ACTION);
-        if (partitionMarkDownAction.contains("done-partition")) {
-            checkNotNull(
-                    metastoreClientFactory,
-                    "Cannot mark done partition for table without metastore.");
-            checkArgument(
-                    coreOptions.partitionedTableInMetastore(),
-                    "Table should enable %s",
-                    METASTORE_PARTITIONED_TABLE.key());
-        }
-
         InternalRowPartitionComputer partitionComputer =
                 new InternalRowPartitionComputer(
                         coreOptions.partitionDefaultName(),
                         table.schema().logicalPartitionType(),
-                        partitionKeys.toArray(new String[0]));
+                        table.partitionKeys().toArray(new String[0]));
 
         PartitionMarkDoneTrigger trigger =
-                new PartitionMarkDoneTrigger(
-                        new PartitionMarkDoneTriggerState(isRestored, 
stateStore),
-                        new PartitionTimeExtractor(
-                                coreOptions.partitionTimestampPattern(),
-                                coreOptions.partitionTimestampFormatter()),
-                        options.get(PARTITION_TIME_INTERVAL),
-                        idleToDone);
+                PartitionMarkDoneTrigger.create(coreOptions, isRestored, 
stateStore);
 
         List<PartitionMarkDoneAction> actions =
-                Arrays.asList(partitionMarkDownAction.split(",")).stream()
+                
Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
                         .map(
                                 action -> {
                                     switch (action) {
@@ -130,7 +88,8 @@ public class PartitionMarkDone implements Closeable {
                                                     table.fileIO(), 
table.location());
                                         case "done-partition":
                                             return new AddDonePartitionAction(
-                                                    
metastoreClientFactory.create());
+                                                    
checkMetastoreAndCreateMetastoreClient(
+                                                            table, 
coreOptions, options));
                                         default:
                                             throw new 
UnsupportedOperationException(action);
                                     }
@@ -140,6 +99,44 @@ public class PartitionMarkDone implements Closeable {
         return new PartitionMarkDone(partitionComputer, trigger, actions);
     }
 
+    private static boolean closePartitionMarkDone(
+            boolean isStreaming, FileStoreTable table, Options options) {
+        boolean partitionMarkDoneWhenEndInput = 
options.get(PARTITION_MARK_DONE_WHEN_END_INPUT);
+        if (!isStreaming && !partitionMarkDoneWhenEndInput) {
+            return true;
+        }
+
+        Duration idleToDone = options.get(PARTITION_IDLE_TIME_TO_DONE);
+        if (isStreaming && idleToDone == null) {
+            return true;
+        }
+
+        List<String> partitionKeys = table.partitionKeys();
+        if (partitionKeys.isEmpty()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private static MetastoreClient checkMetastoreAndCreateMetastoreClient(
+            FileStoreTable table, CoreOptions coreOptions, Options options) {
+        MetastoreClient.Factory metastoreClientFactory =
+                table.catalogEnvironment().metastoreClientFactory();
+
+        if 
(options.get(PARTITION_MARK_DONE_ACTION).contains("done-partition")) {
+            checkNotNull(
+                    metastoreClientFactory,
+                    "Cannot mark done partition for table without metastore.");
+            checkArgument(
+                    coreOptions.partitionedTableInMetastore(),
+                    "Table should enable %s",
+                    METASTORE_PARTITIONED_TABLE.key());
+        }
+
+        return metastoreClientFactory.create();
+    }
+
     public PartitionMarkDone(
             InternalRowPartitionComputer partitionComputer,
             PartitionMarkDoneTrigger trigger,
@@ -149,7 +146,7 @@ public class PartitionMarkDone implements Closeable {
         this.actions = actions;
     }
 
-    public void notifyCommittable(List<ManifestCommittable> committables) {
+    public void notifyCommittable(List<ManifestCommittable> committables, 
boolean endInput) {
         Set<BinaryRow> partitions = new HashSet<>();
         for (ManifestCommittable committable : committables) {
             committable.fileCommittables().stream()
@@ -162,7 +159,7 @@ public class PartitionMarkDone implements Closeable {
                 .map(PartitionPathUtils::generatePartitionPath)
                 .forEach(trigger::notifyPartition);
 
-        for (String partition : trigger.donePartitions()) {
+        for (String partition : trigger.donePartitions(endInput)) {
             try {
                 for (PartitionMarkDoneAction action : actions) {
                     action.markDone(partition);
@@ -183,30 +180,4 @@ public class PartitionMarkDone implements Closeable {
             action.close();
         }
     }
-
-    private static class PartitionMarkDoneTriggerState implements 
PartitionMarkDoneTrigger.State {
-
-        private final boolean isRestored;
-        private final ListState<List<String>> pendingPartitionsState;
-
-        private PartitionMarkDoneTriggerState(boolean isRestored, 
OperatorStateStore stateStore)
-                throws Exception {
-            this.isRestored = isRestored;
-            this.pendingPartitionsState = 
stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
-        }
-
-        @Override
-        public List<String> restore() throws Exception {
-            List<String> pendingPartitions = new ArrayList<>();
-            if (isRestored) {
-                
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
-            }
-            return pendingPartitions;
-        }
-
-        @Override
-        public void update(List<String> partitions) throws Exception {
-            
pendingPartitionsState.update(Collections.singletonList(partitions));
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index 9309938e2..d3c3cc304 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -18,50 +18,81 @@
 
 package org.apache.paimon.flink.sink.partition;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.utils.StringUtils;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
 import java.time.Duration;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_TIME_INTERVAL;
 import static 
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
 
-/** Trigger to mark partitions done. */
+/** Trigger to mark partitions done with streaming job. */
 public class PartitionMarkDoneTrigger {
 
+    private static final ListStateDescriptor<List<String>> 
PENDING_PARTITIONS_STATE_DESC =
+            new ListStateDescriptor<>(
+                    "mark-done-pending-partitions",
+                    new ListSerializer<>(StringSerializer.INSTANCE));
+
     private final State state;
     private final PartitionTimeExtractor timeExtractor;
-    private final long timeInterval;
-    private final long idleTime;
+    private long timeInterval;
+    private long idleTime;
+    private final boolean partitionMarkDoneWhenEndInput;
     private final Map<String, Long> pendingPartitions;
 
     public PartitionMarkDoneTrigger(
             State state,
             PartitionTimeExtractor timeExtractor,
             Duration timeInterval,
-            Duration idleTime)
+            Duration idleTime,
+            boolean partitionMarkDoneWhenEndInput)
             throws Exception {
-        this(state, timeExtractor, timeInterval, idleTime, 
System.currentTimeMillis());
+        this(
+                state,
+                timeExtractor,
+                timeInterval,
+                idleTime,
+                System.currentTimeMillis(),
+                partitionMarkDoneWhenEndInput);
     }
 
-    PartitionMarkDoneTrigger(
+    public PartitionMarkDoneTrigger(
             State state,
             PartitionTimeExtractor timeExtractor,
             Duration timeInterval,
             Duration idleTime,
-            long currentTimeMillis)
+            long currentTimeMillis,
+            boolean partitionMarkDoneWhenEndInput)
             throws Exception {
+        this.pendingPartitions = new HashMap<>();
         this.state = state;
         this.timeExtractor = timeExtractor;
-        this.timeInterval = timeInterval.toMillis();
-        this.idleTime = idleTime.toMillis();
-        this.pendingPartitions = new HashMap<>();
+        if (timeInterval != null) {
+            this.timeInterval = timeInterval.toMillis();
+        }
+        if (idleTime != null) {
+            this.idleTime = idleTime.toMillis();
+        }
+        this.partitionMarkDoneWhenEndInput = partitionMarkDoneWhenEndInput;
         state.restore().forEach(p -> pendingPartitions.put(p, 
currentTimeMillis));
     }
 
@@ -69,17 +100,23 @@ public class PartitionMarkDoneTrigger {
         notifyPartition(partition, System.currentTimeMillis());
     }
 
-    void notifyPartition(String partition, long currentTimeMillis) {
+    public void notifyPartition(String partition, long currentTimeMillis) {
         if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
             this.pendingPartitions.put(partition, currentTimeMillis);
         }
     }
 
-    public List<String> donePartitions() {
-        return donePartitions(System.currentTimeMillis());
+    public List<String> donePartitions(boolean endInput) {
+        return donePartitions(endInput, System.currentTimeMillis());
     }
 
-    public List<String> donePartitions(long currentTimeMillis) {
+    public List<String> donePartitions(boolean endInput, long 
currentTimeMillis) {
+        if (endInput) {
+            return partitionMarkDoneWhenEndInput
+                    ? new ArrayList<>(pendingPartitions.keySet())
+                    : Collections.emptyList();
+        }
+
         List<String> needDone = new ArrayList<>();
         Iterator<Map.Entry<String, Long>> iter = 
pendingPartitions.entrySet().iterator();
         while (iter.hasNext()) {
@@ -110,9 +147,49 @@ public class PartitionMarkDoneTrigger {
 
     /** State to store partitions. */
     public interface State {
-
         List<String> restore() throws Exception;
 
         void update(List<String> partitions) throws Exception;
     }
+
+    /** State to store partitions with streaming job. */
+    private static class PartitionMarkDoneTriggerState implements State {
+
+        private final boolean isRestored;
+        private final ListState<List<String>> pendingPartitionsState;
+
+        public PartitionMarkDoneTriggerState(boolean isRestored, 
OperatorStateStore stateStore)
+                throws Exception {
+            this.isRestored = isRestored;
+            this.pendingPartitionsState = 
stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
+        }
+
+        @Override
+        public List<String> restore() throws Exception {
+            List<String> pendingPartitions = new ArrayList<>();
+            if (isRestored) {
+                
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
+            }
+            return pendingPartitions;
+        }
+
+        @Override
+        public void update(List<String> partitions) throws Exception {
+            
pendingPartitionsState.update(Collections.singletonList(partitions));
+        }
+    }
+
+    public static PartitionMarkDoneTrigger create(
+            CoreOptions coreOptions, boolean isRestored, OperatorStateStore 
stateStore)
+            throws Exception {
+        Options options = coreOptions.toConfiguration();
+        return new PartitionMarkDoneTrigger(
+                new 
PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState(isRestored, stateStore),
+                new PartitionTimeExtractor(
+                        coreOptions.partitionTimestampPattern(),
+                        coreOptions.partitionTimestampFormatter()),
+                options.get(PARTITION_TIME_INTERVAL),
+                options.get(PARTITION_IDLE_TIME_TO_DONE),
+                options.get(PARTITION_MARK_DONE_WHEN_END_INPUT));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
new file mode 100644
index 000000000..e78e5143a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sink.partition.SuccessFile;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
+import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partition mark done with flink batch job. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class FlinkBatchJobPartitionMarkdoneITCase extends CatalogITCaseBase {
+
+    private static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new RowType.RowField("v", new IntType()),
+                            new RowType.RowField("p", new VarCharType(10)),
+                            // rename key
+                            new RowType.RowField("_k", new IntType())));
+
+    private static final List<RowData> SOURCE_DATA =
+            Arrays.asList(
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
+                    wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
+                    wrap(GenericRowData.of(5, StringData.fromString("p2"), 
1)));
+
+    private static SerializableRowData wrap(RowData row) {
+        return new SerializableRowData(row, 
InternalSerializers.create(TABLE_TYPE));
+    }
+
+    private final StreamExecutionEnvironment env;
+
+    public FlinkBatchJobPartitionMarkdoneITCase() {
+        this.env = 
streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();
+    }
+
+    @Parameters(name = "isBatch-{0}")
+    public static List<Boolean> getVarSeg() {
+        return Arrays.asList(true, false);
+    }
+
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS T ("
+                        + "v INT, "
+                        + "p STRING, "
+                        + "_k INT, "
+                        + "PRIMARY KEY (_k) NOT ENFORCED"
+                        + ") PARTITIONED BY (p) WITH ()");
+    }
+
+    public void validateResult(FileStoreTable table) throws Exception {
+        LocalFileIO fileIO = new LocalFileIO();
+        Path successPath1 = new Path(table.location(), "p=p1/_SUCCESS");
+        SuccessFile successFile1 = SuccessFile.safelyFromPath(fileIO, 
successPath1);
+        assertThat(successFile1).isNotNull();
+
+        Path successPath2 = new Path(table.location(), "p=p2/_SUCCESS");
+        SuccessFile successFile2 = SuccessFile.safelyFromPath(fileIO, 
successPath2);
+        assertThat(successFile2).isNotNull();
+    }
+
+    @TestTemplate
+    public void testFlinkBatchJobPartitionMarkdoneBySQL() throws Exception {
+        batchSql(
+                "INSERT INTO T /*+ OPTIONS('partition.end-input-to-done'= 
'true') */ VALUES (0, 'p1', 1), (0, 'p2', 2)");
+
+        validateResult(paimonTable("T"));
+    }
+
+    @TestTemplate
+    public void testFlinkBatchJobPartitionMarkdone() throws Exception {
+        FileStoreTable table = buildFileStoreTable(new int[] {1}, new int[] 
{1, 2});
+
+        // write
+        DataStreamSource<RowData> source =
+                env.fromCollection(SOURCE_DATA, 
InternalTypeInfo.of(TABLE_TYPE));
+        DataStream<Row> input =
+                source.map(
+                                (MapFunction<RowData, Row>)
+                                        r ->
+                                                Row.of(
+                                                        r.getInt(0),
+                                                        
r.getString(1).toString(),
+                                                        r.getInt(2)))
+                        .setParallelism(source.getParallelism());
+        DataType inputType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("v", DataTypes.INT()),
+                        DataTypes.FIELD("p", DataTypes.STRING()),
+                        DataTypes.FIELD("_k", DataTypes.INT()));
+        new FlinkSinkBuilder(table).forRow(input, inputType).build();
+        env.execute();
+
+        validateResult(table);
+    }
+
+    private FileStoreTable buildFileStoreTable(int[] partitions, int[] 
primaryKey)
+            throws Exception {
+        Options options = new Options();
+        options.set(BUCKET, 3);
+        options.set(PATH, getTempDirPath());
+        options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
+        
options.set(FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), 
"true");
+
+        Path tablePath = new CoreOptions(options.toMap()).path();
+        if (primaryKey.length == 0) {
+            options.set(BUCKET_KEY, "_k");
+        }
+        Schema schema =
+                new Schema(
+                        toDataType(TABLE_TYPE).getFields(),
+                        Arrays.stream(partitions)
+                                .mapToObj(i -> 
TABLE_TYPE.getFieldNames().get(i))
+                                .collect(Collectors.toList()),
+                        Arrays.stream(primaryKey)
+                                .mapToObj(i -> 
TABLE_TYPE.getFieldNames().get(i))
+                                .collect(Collectors.toList()),
+                        options.toMap(),
+                        "");
+        return retryArtificialException(
+                () -> {
+                    new SchemaManager(LocalFileIO.create(), 
tablePath).createTable(schema);
+                    return FileStoreTableFactory.create(LocalFileIO.create(), 
options);
+                });
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 3f1281abc..e7e955554 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -563,7 +563,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         StoreCommitter committer =
                 new StoreCommitter(
                         table, commit, Committer.createContext("", 
metricGroup, true, false, null));
-        committer.commit(Collections.singletonList(manifestCommittable));
+        committer.commit(Collections.singletonList(manifestCommittable), 
false);
         CommitterMetrics metrics = committer.getCommitterMetrics();
         assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(293);
         assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
index fd66214e3..f2f6f47c5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.partition;
 
 import org.apache.paimon.partition.PartitionTimeExtractor;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -34,10 +35,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class PartitionMarkDoneTriggerTest {
 
-    @Test
-    public void test() throws Exception {
-        List<String> pendingPartitions = new ArrayList<>();
-        PartitionMarkDoneTrigger.State state =
+    private static final Duration timeInterval = Duration.ofDays(1);
+    private static final Duration idleTime = Duration.ofMinutes(15);
+
+    private List<String> pendingPartitions;
+    private PartitionMarkDoneTrigger.State state;
+    private PartitionTimeExtractor extractor;
+
+    @BeforeEach
+    public void before() throws Exception {
+        this.pendingPartitions = new ArrayList<>();
+        this.state =
                 new PartitionMarkDoneTrigger.State() {
                     @Override
                     public List<String> restore() {
@@ -50,17 +58,23 @@ class PartitionMarkDoneTriggerTest {
                         pendingPartitions.addAll(partitions);
                     }
                 };
+        this.extractor = new PartitionTimeExtractor("$dt", "yyyy-MM-dd");
+    }
 
-        PartitionTimeExtractor extractor = new PartitionTimeExtractor("$dt", 
"yyyy-MM-dd");
-        Duration timeInterval = Duration.ofDays(1);
-        Duration idleTime = Duration.ofMinutes(15);
+    @Test
+    public void testWithoutEndInput() throws Exception {
         PartitionMarkDoneTrigger trigger =
                 new PartitionMarkDoneTrigger(
-                        state, extractor, timeInterval, idleTime, 
toEpochMillis("2024-02-01"));
+                        state,
+                        extractor,
+                        timeInterval,
+                        idleTime,
+                        toEpochMillis("2024-02-01"),
+                        false);
 
         // test not reach partition end + idle time
         trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01"));
-        List<String> partitions = 
trigger.donePartitions(toEpochMillis("2024-02-03"));
+        List<String> partitions = trigger.donePartitions(false, 
toEpochMillis("2024-02-03"));
         assertThat(partitions).isEmpty();
 
         // test state
@@ -69,9 +83,12 @@ class PartitionMarkDoneTriggerTest {
         assertThat(pendingPartitions).containsOnly("dt=2024-02-02");
 
         // test trigger
-        partitions = trigger.donePartitions(toEpochMillis("2024-02-03") + 
idleTime.toMillis());
+        partitions =
+                trigger.donePartitions(false, toEpochMillis("2024-02-03") + 
idleTime.toMillis());
         assertThat(partitions).isEmpty();
-        partitions = trigger.donePartitions(toEpochMillis("2024-02-03") + 
idleTime.toMillis() + 1);
+        partitions =
+                trigger.donePartitions(
+                        false, toEpochMillis("2024-02-03") + 
idleTime.toMillis() + 1);
         assertThat(partitions).containsOnly("dt=2024-02-02");
 
         // test state
@@ -81,23 +98,50 @@ class PartitionMarkDoneTriggerTest {
         // test refresh
         trigger.notifyPartition("dt=2024-02-03", toEpochMillis("2024-02-03"));
         trigger.notifyPartition("dt=2024-02-03", toEpochMillis("2024-02-04") + 
idleTime.toMillis());
-        partitions = trigger.donePartitions(toEpochMillis("2024-02-04") + 
idleTime.toMillis() + 1);
+        partitions =
+                trigger.donePartitions(
+                        false, toEpochMillis("2024-02-04") + 
idleTime.toMillis() + 1);
         assertThat(partitions).isEmpty();
         partitions =
-                trigger.donePartitions(toEpochMillis("2024-02-04") + 2 * 
idleTime.toMillis() + 1);
+                trigger.donePartitions(
+                        false, toEpochMillis("2024-02-04") + 2 * 
idleTime.toMillis() + 1);
         assertThat(partitions).containsOnly("dt=2024-02-03");
 
         // test restore
         pendingPartitions.add("dt=2024-02-04");
         trigger =
                 new PartitionMarkDoneTrigger(
-                        state, extractor, timeInterval, idleTime, 
toEpochMillis("2024-02-06"));
-        partitions = trigger.donePartitions(toEpochMillis("2024-02-06"));
+                        state,
+                        extractor,
+                        timeInterval,
+                        idleTime,
+                        toEpochMillis("2024-02-06"),
+                        false);
+        partitions = trigger.donePartitions(false, 
toEpochMillis("2024-02-06"));
         assertThat(partitions).isEmpty();
-        partitions = trigger.donePartitions(toEpochMillis("2024-02-06") + 
idleTime.toMillis() + 1);
+        partitions =
+                trigger.donePartitions(
+                        false, toEpochMillis("2024-02-06") + 
idleTime.toMillis() + 1);
         assertThat(partitions).containsOnly("dt=2024-02-04");
     }
 
+    @Test
+    public void testWithEndInput() throws Exception {
+        PartitionMarkDoneTrigger trigger =
+                new PartitionMarkDoneTrigger(
+                        state,
+                        extractor,
+                        timeInterval,
+                        idleTime,
+                        toEpochMillis("2024-02-01"),
+                        true);
+
+        // test not reach partition end + idle time
+        trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01"));
+        List<String> partitions = trigger.donePartitions(true, 
toEpochMillis("2024-02-03"));
+        assertThat(partitions).containsOnly("dt=2024-02-02");
+    }
+
     private long toEpochMillis(String dt) {
         return LocalDateTime.of(LocalDate.parse(dt), LocalTime.MIN)
                 .atZone(ZoneId.systemDefault())


Reply via email to