This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a014ec32a [core] Rename PartitionListener to CommitListener (#5765)
4a014ec32a is described below
commit 4a014ec32afbe452c5fb39cfe6547e8237d2fb41
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 18 11:11:34 2025 +0800
[core] Rename PartitionListener to CommitListener (#5765)
---
.../procedure/MarkPartitionDoneProcedure.java | 2 +-
.../flink/action/MarkPartitionDoneAction.java | 2 +-
.../procedure/MarkPartitionDoneProcedure.java | 2 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 14 +++++------
...{PartitionListener.java => CommitListener.java} | 5 ++--
...artitionListeners.java => CommitListeners.java} | 29 ++++++++++++----------
...arkDone.java => PartitionMarkDoneListener.java} | 23 ++++++++---------
.../sink/partition/ReportPartStatsListener.java | 7 +++---
.../CustomPartitionMarkDoneActionTest.java | 6 ++---
.../sink/partition/PartitionMarkDoneTest.java | 12 +++++----
10 files changed, 52 insertions(+), 50 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index 22abfb3f3b..e67811c4e0 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
import java.io.IOException;
import java.util.List;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
+import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index c566af0a19..bfad0bf1b6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -26,7 +26,7 @@ import org.apache.paimon.utils.PartitionPathUtils;
import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
+import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
/** Table partition mark done action for Flink. */
public class MarkPartitionDoneAction extends TableActionBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index d73553045b..90ebd48003 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
import java.io.IOException;
import java.util.List;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
+import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 3b5d5bf627..34a087f678 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
-import org.apache.paimon.flink.sink.partition.PartitionListeners;
+import org.apache.paimon.flink.sink.partition.CommitListeners;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.BucketMode;
@@ -44,7 +44,7 @@ public class StoreCommitter implements Committer<Committable,
ManifestCommittabl
private final TableCommitImpl commit;
@Nullable private final CommitterMetrics committerMetrics;
- private final PartitionListeners partitionListeners;
+ private final CommitListeners commitListeners;
private final boolean allowLogOffsetDuplicate;
public StoreCommitter(FileStoreTable table, TableCommit commit, Context
context) {
@@ -58,7 +58,7 @@ public class StoreCommitter implements Committer<Committable,
ManifestCommittabl
}
try {
- this.partitionListeners = PartitionListeners.create(context,
table);
+ this.commitListeners = CommitListeners.create(context, table);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -110,7 +110,7 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
throws IOException, InterruptedException {
commit.commitMultiple(committables, false);
calcNumBytesAndRecordsOut(committables);
- partitionListeners.notifyCommittable(committables);
+ commitListeners.notifyCommittable(committables);
}
@Override
@@ -119,7 +119,7 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
boolean checkAppendFiles,
boolean partitionMarkDoneRecoverFromState) {
int committed = commit.filterAndCommitMultiple(globalCommittables,
checkAppendFiles);
- partitionListeners.notifyCommittable(globalCommittables,
partitionMarkDoneRecoverFromState);
+ commitListeners.notifyCommittable(globalCommittables,
partitionMarkDoneRecoverFromState);
return committed;
}
@@ -127,7 +127,7 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
@Override
public Map<Long, List<Committable>>
groupByCheckpoint(Collection<Committable> committables) {
try {
- partitionListeners.snapshotState();
+ commitListeners.snapshotState();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -142,7 +142,7 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
@Override
public void close() throws Exception {
commit.close();
- partitionListeners.close();
+ commitListeners.close();
}
public boolean allowLogOffsetDuplicate() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
similarity index 85%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
index 18c365081a..fe70820651 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
@@ -24,10 +24,9 @@ import java.io.Closeable;
import java.util.List;
/** The partition listener. */
-public interface PartitionListener extends Closeable {
+public interface CommitListener extends Closeable {
- void notifyCommittable(
- List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState);
+ void notifyCommittable(List<ManifestCommittable> committables);
void snapshotState() throws Exception;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
similarity index 72%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
index 8f8d633d43..58ee1f969e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
@@ -29,30 +29,33 @@ import java.util.ArrayList;
import java.util.List;
/** Partition listeners. */
-public class PartitionListeners implements Closeable {
+public class CommitListeners implements Closeable {
- private final List<PartitionListener> listeners;
+ private final List<CommitListener> listeners;
- private PartitionListeners(List<PartitionListener> listeners) {
+ private CommitListeners(List<CommitListener> listeners) {
this.listeners = listeners;
}
public void notifyCommittable(List<ManifestCommittable> committables) {
- for (PartitionListener trigger : listeners) {
- trigger.notifyCommittable(committables, true);
+ for (CommitListener listener : listeners) {
+ listener.notifyCommittable(committables);
}
}
public void notifyCommittable(
List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState) {
- for (PartitionListener trigger : listeners) {
- trigger.notifyCommittable(committables,
partitionMarkDoneRecoverFromState);
+ for (CommitListener listener : listeners) {
+ if (partitionMarkDoneRecoverFromState
+ || !(listener instanceof PartitionMarkDoneListener)) {
+ listener.notifyCommittable(committables);
+ }
}
}
public void snapshotState() throws Exception {
- for (PartitionListener trigger : listeners) {
- trigger.snapshotState();
+ for (CommitListener listener : listeners) {
+ listener.snapshotState();
}
}
@@ -61,16 +64,16 @@ public class PartitionListeners implements Closeable {
IOUtils.closeAllQuietly(listeners);
}
- public static PartitionListeners create(Committer.Context context,
FileStoreTable table)
+ public static CommitListeners create(Committer.Context context,
FileStoreTable table)
throws Exception {
- List<PartitionListener> listeners = new ArrayList<>();
+ List<CommitListener> listeners = new ArrayList<>();
// partition statistics reporter
ReportPartStatsListener.create(context.isRestored(),
context.stateStore(), table)
.ifPresent(listeners::add);
// partition mark done
- PartitionMarkDone.create(
+ PartitionMarkDoneListener.create(
context.getClass().getClassLoader(),
context.streamingCheckpointEnabled(),
context.isRestored(),
@@ -78,6 +81,6 @@ public class PartitionListeners implements Closeable {
table)
.ifPresent(listeners::add);
- return new PartitionListeners(listeners);
+ return new CommitListeners(listeners);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
similarity index 93%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
index 96081dee0c..b5b82cda1b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
@@ -51,9 +51,9 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_
import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_MODE;
/** Mark partition done. */
-public class PartitionMarkDone implements PartitionListener {
+public class PartitionMarkDoneListener implements CommitListener {
- private static final Logger LOG =
LoggerFactory.getLogger(PartitionMarkDone.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionMarkDoneListener.class);
private final InternalRowPartitionComputer partitionComputer;
private final PartitionMarkDoneTrigger trigger;
@@ -61,7 +61,7 @@ public class PartitionMarkDone implements PartitionListener {
private final boolean waitCompaction;
private final PartitionMarkDoneActionMode partitionMarkDoneActionMode;
- public static Optional<PartitionMarkDone> create(
+ public static Optional<PartitionMarkDoneListener> create(
ClassLoader cl,
boolean isStreaming,
boolean isRestored,
@@ -96,7 +96,7 @@ public class PartitionMarkDone implements PartitionListener {
|| coreOptions.mergeEngine() ==
MergeEngine.FIRST_ROW);
return Optional.of(
- new PartitionMarkDone(
+ new PartitionMarkDoneListener(
partitionComputer,
trigger,
actions,
@@ -119,7 +119,7 @@ public class PartitionMarkDone implements PartitionListener
{
return table.partitionKeys().isEmpty();
}
- public PartitionMarkDone(
+ public PartitionMarkDoneListener(
InternalRowPartitionComputer partitionComputer,
PartitionMarkDoneTrigger trigger,
List<PartitionMarkDoneAction> actions,
@@ -133,14 +133,11 @@ public class PartitionMarkDone implements
PartitionListener {
}
@Override
- public void notifyCommittable(
- List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState) {
- if (partitionMarkDoneRecoverFromState) {
- if (partitionMarkDoneActionMode ==
PartitionMarkDoneActionMode.WATERMARK) {
- markDoneByWatermark(committables);
- } else {
- markDoneByProcessTime(committables);
- }
+ public void notifyCommittable(List<ManifestCommittable> committables) {
+ if (partitionMarkDoneActionMode ==
PartitionMarkDoneActionMode.WATERMARK) {
+ markDoneByWatermark(committables);
+ } else {
+ markDoneByProcessTime(committables);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index db20656b85..4b815d336f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -50,7 +50,7 @@ import java.util.Set;
* This listener will collect data from the newly touched partition and then
decide when to trigger
* a report based on the partition's idle time.
*/
-public class ReportPartStatsListener implements PartitionListener {
+public class ReportPartStatsListener implements CommitListener {
@SuppressWarnings("unchecked")
private static final ListStateDescriptor<Map<String, Long>>
PENDING_REPORT_STATE_DESC =
@@ -85,8 +85,8 @@ public class ReportPartStatsListener implements
PartitionListener {
this.idleTime = idleTime;
}
- public void notifyCommittable(
- List<ManifestCommittable> committables, boolean
partitionMarkDoneRecoverFromState) {
+ @Override
+ public void notifyCommittable(List<ManifestCommittable> committables) {
Set<String> partition = new HashSet<>();
boolean endInput = false;
for (ManifestCommittable committable : committables) {
@@ -136,6 +136,7 @@ public class ReportPartStatsListener implements
PartitionListener {
return result;
}
+ @Override
public void snapshotState() throws Exception {
pendingPartitionsState.update(Collections.singletonList(pendingPartitions));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
index 55d1cabdcd..a4f9a0e574 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
@@ -62,7 +62,7 @@ public class CustomPartitionMarkDoneActionTest extends
TableTestBase {
// set.
Assertions.assertThatThrownBy(
() ->
- PartitionMarkDone.create(
+ PartitionMarkDoneListener.create(
getClass().getClassLoader(),
false,
false,
@@ -85,8 +85,8 @@ public class CustomPartitionMarkDoneActionTest extends
TableTestBase {
FileStoreTable table2 = (FileStoreTable) catalog.getTable(identifier);
- PartitionMarkDone markDone =
- PartitionMarkDone.create(
+ PartitionMarkDoneListener markDone =
+ PartitionMarkDoneListener.create(
getClass().getClassLoader(),
false,
false,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index eb7de0fd41..9e541d5ff0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -91,8 +91,8 @@ class PartitionMarkDoneTest extends TableTestBase {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
Path location = table.location();
Path successFile = new Path(location, "a=0/_SUCCESS");
- PartitionMarkDone markDone =
- PartitionMarkDone.create(
+ PartitionMarkDoneListener markDone =
+ PartitionMarkDoneListener.create(
getClass().getClassLoader(),
false,
false,
@@ -115,12 +115,12 @@ class PartitionMarkDoneTest extends TableTestBase {
}
}
- public static void notifyCommits(PartitionMarkDone markDone, boolean
isCompact) {
+ public static void notifyCommits(PartitionMarkDoneListener markDone,
boolean isCompact) {
notifyCommits(markDone, isCompact, true);
}
private static void notifyCommits(
- PartitionMarkDone markDone,
+ PartitionMarkDoneListener markDone,
boolean isCompact,
boolean partitionMarkDoneRecoverFromState) {
ManifestCommittable committable = new
ManifestCommittable(Long.MAX_VALUE);
@@ -146,7 +146,9 @@ class PartitionMarkDoneTest extends TableTestBase {
new IndexIncrement(emptyList()));
}
committable.addFileCommittable(compactMessage);
- markDone.notifyCommittable(singletonList(committable),
partitionMarkDoneRecoverFromState);
+ if (partitionMarkDoneRecoverFromState) {
+ markDone.notifyCommittable(singletonList(committable));
+ }
}
public static class MockOperatorStateStore implements OperatorStateStore {