This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc212c3203 [FLINK-36368] Do not prematurely merge CommittableManager
(#25405)
8dc212c3203 is described below
commit 8dc212c32036c3a4afd3af2e50e95cb87c8cee23
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Oct 2 21:31:41 2024 +0200
[FLINK-36368] Do not prematurely merge CommittableManager (#25405)
When a sink contains a shuffle between writer and committer, a committer
may receive committables coming from multiple subtasks. So far, we immediately
merged them on receiving. However, that makes it later impossible to trace
whether we received all messages from an upstream task.
It also made rescaling of the committer awkward: during normal processing
all committables of a committer have the same subtaskId as the committer. On
downscale, these subtaskIds suddenly don't match and need to be replaced, which
we solved by merging the SubtaskCommittableManagers.
This commit decouples the collection of committables from changing the
subtaskId for emission. Committables retain the upstream subtask id in the
CommittableCollection, which survives serialization and deserialization. Only
upon emission, we substitute the subtask id with the one of the emitting
committer.
This is, in particular, useful for a global committer, where all subtasks
are collected. As a side fix, the new serialization also contains the
numberOfSubtasks such that different checkpoints may have different degree of
parallelism.
The old approach probably has edge cases where scaling a UC would result in
stalled pipelines because certain metadata doesn't match. This would not affect
pipelines which chain Writer/Committer (no channel state), Writer and
Committer have same DOP (results in a Forward channel, which doesn't use UC for
exactly these reasons), and a non-keyed shuffles (because they don't provide
any guarantees). Since a keyed shuffle must use the subtask id of the
committables, the new approach shou [...]
---
.../connector/sink2/CommittableWithLineage.java | 5 ++
.../connector/sink2/GlobalCommitterOperator.java | 28 ++----
.../connector/sink2/GlobalCommitterSerializer.java | 9 +-
.../runtime/operators/sink/CommitterOperator.java | 13 +--
.../committables/CheckpointCommittableManager.java | 40 ++++++++-
.../CheckpointCommittableManagerImpl.java | 72 +++++++++++----
.../sink/committables/CommitRequestImpl.java | 33 +++++++
.../sink/committables/CommittableCollector.java | 100 +++++++++------------
.../CommittableCollectorSerializer.java | 38 ++++----
.../sink/committables/CommittableManager.java | 50 -----------
.../committables/SubtaskCommittableManager.java | 30 ++++++-
.../sink2/GlobalCommitterSerializerTest.java | 23 ++---
.../CheckpointCommittableManagerImplTest.java | 29 +++---
.../CommittableCollectorSerializerTest.java | 50 +++++------
.../committables/CommittableCollectorTest.java | 8 +-
.../operators/sink/CommitterOperatorTestBase.java | 18 ++--
16 files changed, 288 insertions(+), 258 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index bdfe9de262f..819b4fdfc4f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -58,6 +58,11 @@ public class CommittableWithLineage<CommT> implements
CommittableMessage<CommT>
return new CommittableWithLineage<>(mapper.apply(committable),
checkpointId, subtaskId);
}
+ /** Creates a shallow copy with the given subtaskId. */
+ public CommittableWithLineage<CommT> withSubtaskId(int subtaskId) {
+ return new CommittableWithLineage<>(committable, checkpointId,
subtaskId);
+ }
+
@Override
public String toString() {
return "CommittableWithLineage{"
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
index 79f97aab9e2..c06c95751a5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
@@ -36,7 +36,6 @@ import
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
-import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.function.SerializableSupplier;
@@ -45,7 +44,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -88,7 +86,7 @@ class GlobalCommitterOperator<CommT, GlobalCommT> extends
AbstractStreamOperator
super.setup(containingTask, config, output);
committer = committerFactory.get();
metricGroup = InternalSinkCommitterMetricGroup.wrap(metrics);
- committableCollector = CommittableCollector.of(getRuntimeContext(),
metricGroup);
+ committableCollector = CommittableCollector.of(metricGroup);
committableSerializer = committableSerializerFactory.get();
}
@@ -113,11 +111,7 @@ class GlobalCommitterOperator<CommT, GlobalCommT> extends
AbstractStreamOperator
metricGroup);
final SimpleVersionedSerializer<GlobalCommittableWrapper<CommT,
GlobalCommT>> serializer =
new GlobalCommitterSerializer<>(
- committableCollectorSerializer,
- globalCommittableSerializer,
-
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
-
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
- metricGroup);
+ committableCollectorSerializer,
globalCommittableSerializer, metricGroup);
globalCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
@@ -144,25 +138,17 @@ class GlobalCommitterOperator<CommT, GlobalCommT> extends
AbstractStreamOperator
commit(lastCompletedCheckpointId);
}
- private Collection<? extends CheckpointCommittableManager<CommT>>
getCommittables(
- long checkpointId) {
- final Collection<? extends CheckpointCommittableManager<CommT>>
committables =
-
committableCollector.getCheckpointCommittablesUpTo(checkpointId);
- if (committables == null) {
- return Collections.emptyList();
- }
- return committables;
- }
-
private void commit(long checkpointId) throws IOException,
InterruptedException {
- for (CheckpointCommittableManager<CommT> committable :
getCommittables(checkpointId)) {
- committable.commit(committer);
+ for (CheckpointCommittableManager<CommT> checkpoint :
+
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
+ checkpoint.commit(committer);
}
+ committableCollector.compact();
}
@Override
public void endInput() throws Exception {
- final CommittableManager<CommT> endOfInputCommittable =
+ final CheckpointCommittableManager<CommT> endOfInputCommittable =
committableCollector.getEndOfInputCommittable();
if (endOfInputCommittable != null) {
do {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java
index 36b38583517..a8c5ae54978 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java
@@ -48,20 +48,14 @@ class GlobalCommitterSerializer<CommT, GlobalCommT>
private final CommittableCollectorSerializer<CommT>
committableCollectorSerializer;
@Nullable private final SimpleVersionedSerializer<GlobalCommT>
globalCommittableSerializer;
- private final int subtaskId;
- private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;
GlobalCommitterSerializer(
CommittableCollectorSerializer<CommT>
committableCollectorSerializer,
@Nullable SimpleVersionedSerializer<GlobalCommT>
globalCommittableSerializer,
- int subtaskId,
- int numberOfSubtasks,
SinkCommitterMetricGroup metricGroup) {
this.committableCollectorSerializer =
checkNotNull(committableCollectorSerializer);
this.globalCommittableSerializer = globalCommittableSerializer;
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
this.metricGroup = metricGroup;
}
@@ -111,8 +105,7 @@ class GlobalCommitterSerializer<CommT, GlobalCommT>
SinkV1CommittableDeserializer.readVersionAndDeserializeList(
globalCommittableSerializer, in);
return new GlobalCommittableWrapper<>(
- new CommittableCollector<>(subtaskId, numberOfSubtasks,
metricGroup),
- globalCommittables);
+ new CommittableCollector<>(metricGroup), globalCommittables);
}
private GlobalCommittableWrapper<CommT, GlobalCommT>
deserializeV2(DataInputView in)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index f0e717797de..dba8ccaf96f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -40,7 +40,6 @@ import
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
-import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -114,7 +113,7 @@ class CommitterOperator<CommT> extends
AbstractStreamOperator<CommittableMessage
Output<StreamRecord<CommittableMessage<CommT>>> output) {
super.setup(containingTask, config, output);
metricGroup = InternalSinkCommitterMetricGroup.wrap(getMetricGroup());
- committableCollector = CommittableCollector.of(getRuntimeContext(),
metricGroup);
+ committableCollector = CommittableCollector.of(metricGroup);
}
@Override
@@ -179,15 +178,19 @@ class CommitterOperator<CommT> extends
AbstractStreamOperator<CommittableMessage
// if not endInput, we can schedule retrying later
retryWithDelay();
}
+ committableCollector.compact();
}
- private void commitAndEmit(CommittableManager<CommT> committableManager)
+ private void commitAndEmit(CheckpointCommittableManager<CommT>
committableManager)
throws IOException, InterruptedException {
Collection<CommittableWithLineage<CommT>> committed =
committableManager.commit(committer);
if (emitDownstream && !committed.isEmpty()) {
- output.collect(new
StreamRecord<>(committableManager.getSummary()));
+ int subtaskId =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ int numberOfSubtasks =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+ output.collect(
+ new
StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks)));
for (CommittableWithLineage<CommT> committable : committed) {
- output.collect(new StreamRecord<>(committable));
+ output.collect(new
StreamRecord<>(committable.withSubtaskId(subtaskId)));
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
index f6525dd27f7..a4ce99f2bfe 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
@@ -19,18 +19,52 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.Collection;
/**
- * This interface adds checkpoint meta information to the committable.
+ * A {@code CheckpointCommittableManager} collects committables for one
checkpoint across
+ * potentially multiple upstream subtasks.
+ *
+ * <p>While it collects committables from multiple upstream subtasks, it
belongs to exactly one
+ * committer subtask.
+ *
+ * <p>Each upstream subtask of this particular checkpoint is represented by a
{@link
+ * SubtaskCommittableManager}.
*
* @param <CommT> type of the committable
*/
@Internal
-public interface CheckpointCommittableManager<CommT> extends
CommittableManager<CommT> {
+public interface CheckpointCommittableManager<CommT> {
/**
- * Returns the checkpoint id in which the committable was created.
+ * Returns the checkpoint id in which the committables were created.
*
* @return checkpoint id
*/
long getCheckpointId();
+
+ /** Returns the number of upstream subtasks belonging to the checkpoint. */
+ int getNumberOfSubtasks();
+
+ /**
+ * Returns a summary of the current commit progress for the emitting
subtask identified by the
+ * parameters.
+ */
+ CommittableSummary<CommT> getSummary(int emittingSubtaskId, int
emittingNumberOfSubtasks);
+
+ /**
+ * Commits all due committables if all respective committables of the
specific subtask and
+ * checkpoint have been received.
+ *
+ * @param committer used to commit to the external system
+ * @return successfully committed committables with meta information
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ Collection<CommittableWithLineage<CommT>> commit(Committer<CommT>
committer)
+ throws IOException, InterruptedException;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 9cbe0b16eaa..256dbb6dcfe 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -32,6 +33,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -42,39 +44,43 @@ class CheckpointCommittableManagerImpl<CommT> implements
CheckpointCommittableMa
private final Map<Integer, SubtaskCommittableManager<CommT>>
subtasksCommittableManagers;
private final long checkpointId;
- private final int subtaskId;
private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;
private static final Logger LOG =
LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);
- CheckpointCommittableManagerImpl(
- int subtaskId,
- int numberOfSubtasks,
- long checkpointId,
- SinkCommitterMetricGroup metricGroup) {
- this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId,
metricGroup);
- }
-
+ @VisibleForTesting
CheckpointCommittableManagerImpl(
Map<Integer, SubtaskCommittableManager<CommT>>
subtasksCommittableManagers,
- int subtaskId,
int numberOfSubtasks,
long checkpointId,
SinkCommitterMetricGroup metricGroup) {
this.subtasksCommittableManagers =
checkNotNull(subtasksCommittableManagers);
- this.subtaskId = subtaskId;
this.numberOfSubtasks = numberOfSubtasks;
this.checkpointId = checkpointId;
this.metricGroup = metricGroup;
}
+ public static <CommT> CheckpointCommittableManagerImpl<CommT> forSummary(
+ CommittableSummary<CommT> summary, SinkCommitterMetricGroup
metricGroup) {
+ return new CheckpointCommittableManagerImpl<>(
+ new HashMap<>(),
+ summary.getNumberOfSubtasks(),
+ summary.getCheckpointIdOrEOI(),
+ metricGroup);
+ }
+
@Override
public long getCheckpointId() {
return checkpointId;
}
+ @Override
+ public int getNumberOfSubtasks() {
+ return numberOfSubtasks;
+ }
+
Collection<SubtaskCommittableManager<CommT>>
getSubtaskCommittableManagers() {
return subtasksCommittableManagers.values();
}
@@ -83,7 +89,10 @@ class CheckpointCommittableManagerImpl<CommT> implements
CheckpointCommittableMa
long checkpointId = summary.getCheckpointIdOrEOI();
SubtaskCommittableManager<CommT> manager =
new SubtaskCommittableManager<>(
- summary.getNumberOfCommittables(), subtaskId,
checkpointId, metricGroup);
+ summary.getNumberOfCommittables(),
+ summary.getSubtaskId(),
+ checkpointId,
+ metricGroup);
if (checkpointId == CommittableMessage.EOI) {
SubtaskCommittableManager<CommT> merged =
subtasksCommittableManagers.merge(
@@ -117,10 +126,11 @@ class CheckpointCommittableManagerImpl<CommT> implements
CheckpointCommittableMa
}
@Override
- public CommittableSummary<CommT> getSummary() {
+ public CommittableSummary<CommT> getSummary(
+ int emittingSubtaskId, int emittingNumberOfSubtasks) {
return new CommittableSummary<>(
- subtaskId,
- numberOfSubtasks,
+ emittingSubtaskId,
+ emittingNumberOfSubtasks,
checkpointId,
subtasksCommittableManagers.values().stream()
.mapToInt(SubtaskCommittableManager::getNumCommittables)
@@ -180,9 +190,39 @@ class CheckpointCommittableManagerImpl<CommT> implements
CheckpointCommittableMa
return new CheckpointCommittableManagerImpl<>(
subtasksCommittableManagers.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (e) ->
e.getValue().copy())),
- subtaskId,
numberOfSubtasks,
checkpointId,
metricGroup);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointCommittableManagerImpl<?> that =
(CheckpointCommittableManagerImpl<?>) o;
+ return checkpointId == that.checkpointId
+ && numberOfSubtasks == that.numberOfSubtasks
+ && Objects.equals(subtasksCommittableManagers,
that.subtasksCommittableManagers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtasksCommittableManagers, checkpointId,
numberOfSubtasks);
+ }
+
+ @Override
+ public String toString() {
+ return "CheckpointCommittableManagerImpl{"
+ + "numberOfSubtasks="
+ + numberOfSubtasks
+ + ", checkpointId="
+ + checkpointId
+ + ", subtasksCommittableManagers="
+ + subtasksCommittableManagers
+ + '}';
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
index 8837f2afb32..e03ef306078 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import java.util.Objects;
+
/**
* Internal implementation to commit a specific committable and handle the
response.
*
@@ -118,4 +120,35 @@ public class CommitRequestImpl<CommT> implements
Committer.CommitRequest<CommT>
CommitRequestImpl<CommT> copy() {
return new CommitRequestImpl<>(committable, numRetries, state,
metricGroup);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CommitRequestImpl<?> that = (CommitRequestImpl<?>) o;
+ return numRetries == that.numRetries
+ && Objects.equals(committable, that.committable)
+ && state == that.state;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(committable, numRetries, state);
+ }
+
+ @Override
+ public String toString() {
+ return "CommitRequestImpl{"
+ + "state="
+ + state
+ + ", numRetries="
+ + numRetries
+ + ", committable="
+ + committable
+ + '}';
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 2dac78c71ea..7a337059c04 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.connector.sink2.InitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
@@ -34,14 +33,17 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This class is responsible to book-keep the committing progress across
checkpoints and subtasks.
- * It handles the emission of committables and the {@link CommittableSummary}.
+ * This class is responsible to book-keep the committing progress across
checkpoints and upstream
+ * subtasks.
+ *
+ * <p>Each checkpoint in turn is handled by a {@link
CheckpointCommittableManager}.
*
* @param <CommT> type of committable
*/
@@ -51,29 +53,18 @@ public class CommittableCollector<CommT> {
/** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}.
*/
private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
checkpointCommittables;
- /** Denotes the subtask id the collector is running. */
- private final int subtaskId;
- private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;
- public CommittableCollector(
- int subtaskId, int numberOfSubtasks, SinkCommitterMetricGroup
metricGroup) {
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
- this.checkpointCommittables = new TreeMap<>();
- this.metricGroup = metricGroup;
+ public CommittableCollector(SinkCommitterMetricGroup metricGroup) {
+ this(new TreeMap<>(), metricGroup);
}
/** For deep-copy. */
CommittableCollector(
Map<Long, CheckpointCommittableManagerImpl<CommT>>
checkpointCommittables,
- int subtaskId,
- int numberOfSubtasks,
SinkCommitterMetricGroup metricGroup) {
this.checkpointCommittables = new
TreeMap<>(checkNotNull(checkpointCommittables));
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
this.metricGroup = metricGroup;
}
@@ -81,17 +72,12 @@ public class CommittableCollector<CommT> {
* Creates a {@link CommittableCollector} based on the current runtime
information. This method
* should be used for to instantiate a collector for all Sink V2.
*
- * @param context holding runtime of information
* @param metricGroup storing the committable metrics
* @param <CommT> type of the committable
* @return {@link CommittableCollector}
*/
- public static <CommT> CommittableCollector<CommT> of(
- RuntimeContext context, SinkCommitterMetricGroup metricGroup) {
- return new CommittableCollector<>(
- context.getTaskInfo().getIndexOfThisSubtask(),
- context.getTaskInfo().getNumberOfParallelSubtasks(),
- metricGroup);
+ public static <CommT> CommittableCollector<CommT>
of(SinkCommitterMetricGroup metricGroup) {
+ return new CommittableCollector<>(metricGroup);
}
/**
@@ -105,8 +91,7 @@ public class CommittableCollector<CommT> {
*/
static <CommT> CommittableCollector<CommT> ofLegacy(
List<CommT> committables, SinkCommitterMetricGroup metricGroup) {
- CommittableCollector<CommT> committableCollector =
- new CommittableCollector<>(0, 1, metricGroup);
+ CommittableCollector<CommT> committableCollector = new
CommittableCollector<>(metricGroup);
// add a checkpoint with the lowest checkpoint id, this will be merged
into the next
// checkpoint data, subtask id is arbitrary
CommittableSummary<CommT> summary =
@@ -148,21 +133,16 @@ public class CommittableCollector<CommT> {
*/
public Collection<? extends CheckpointCommittableManager<CommT>>
getCheckpointCommittablesUpTo(
long checkpointId) {
- // clean up fully committed previous checkpoints
- // this wouldn't work with concurrent unaligned checkpoints
- Collection<CheckpointCommittableManagerImpl<CommT>> checkpoints =
- checkpointCommittables.headMap(checkpointId, true).values();
- checkpoints.removeIf(CheckpointCommittableManagerImpl::isFinished);
- return checkpoints;
+ return checkpointCommittables.headMap(checkpointId, true).values();
}
/**
- * Returns {@link CommittableManager} belonging to the last input.
+ * Returns {@link CheckpointCommittableManager} belonging to the last
input.
*
* @return {@link CheckpointCommittableManager}
*/
@Nullable
- public CommittableManager<CommT> getEndOfInputCommittable() {
+ public CheckpointCommittableManager<CommT> getEndOfInputCommittable() {
return checkpointCommittables.get(EOI);
}
@@ -194,24 +174,6 @@ public class CommittableCollector<CommT> {
}
}
- /**
- * Returns number of subtasks.
- *
- * @return number of subtasks
- */
- public int getNumberOfSubtasks() {
- return numberOfSubtasks;
- }
-
- /**
- * Returns subtask id.
- *
- * @return subtask id.
- */
- public int getSubtaskId() {
- return subtaskId;
- }
-
/**
* Returns a new committable collector that deep copies all internals.
*
@@ -222,8 +184,6 @@ public class CommittableCollector<CommT> {
checkpointCommittables.entrySet().stream()
.map(e -> Tuple2.of(e.getKey(), e.getValue().copy()))
.collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1)),
- subtaskId,
- numberOfSubtasks,
metricGroup);
}
@@ -235,12 +195,7 @@ public class CommittableCollector<CommT> {
checkpointCommittables
.computeIfAbsent(
summary.getCheckpointIdOrEOI(),
- key ->
- new CheckpointCommittableManagerImpl<>(
- subtaskId,
- numberOfSubtasks,
- summary.getCheckpointIdOrEOI(),
- metricGroup))
+ key ->
CheckpointCommittableManagerImpl.forSummary(summary, metricGroup))
.addSummary(summary);
}
@@ -254,4 +209,31 @@ public class CommittableCollector<CommT> {
this.checkpointCommittables.get(committable.getCheckpointIdOrEOI());
return checkNotNull(committables, "Unknown checkpoint for %s",
committable);
}
+
+ /** Removes all metadata about checkpoints of which all committables are
fully committed. */
+ public void compact() {
+
checkpointCommittables.values().removeIf(CheckpointCommittableManagerImpl::isFinished);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CommittableCollector<?> that = (CommittableCollector<?>) o;
+ return Objects.equals(checkpointCommittables,
that.checkpointCommittables);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(checkpointCommittables);
+ }
+
+ @Override
+ public String toString() {
+ return "CommittableCollector{" + "checkpointCommittables=" +
checkpointCommittables + '}';
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
index 7bb6d769a48..f41350bd258 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
@@ -48,18 +48,21 @@ public final class CommittableCollectorSerializer<CommT>
private static final int MAGIC_NUMBER = 0xb91f252c;
private final SimpleVersionedSerializer<CommT> committableSerializer;
- private final int subtaskId;
- private final int numberOfSubtasks;
+ /** Default values are used to deserialize from Flink 1 that didn't store
the information. */
+ private final int owningSubtaskId;
+ /** Default values are used to deserialize from Flink 1 that didn't store
the information. */
+ private final int owningNumberOfSubtasks;
+
private final SinkCommitterMetricGroup metricGroup;
public CommittableCollectorSerializer(
SimpleVersionedSerializer<CommT> committableSerializer,
- int subtaskId,
- int numberOfSubtasks,
+ int owningSubtaskId,
+ int owningNumberOfSubtasks,
SinkCommitterMetricGroup metricGroup) {
this.committableSerializer = checkNotNull(committableSerializer);
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
+ this.owningSubtaskId = owningSubtaskId;
+ this.owningNumberOfSubtasks = owningNumberOfSubtasks;
this.metricGroup = metricGroup;
}
@@ -116,8 +119,6 @@ public final class CommittableCollectorSerializer<CommT>
.collect(
Collectors.toMap(
CheckpointCommittableManagerImpl::getCheckpointId, e -> e)),
- subtaskId,
- numberOfSubtasks,
metricGroup);
}
@@ -134,7 +135,7 @@ public final class CommittableCollectorSerializer<CommT>
@Override
public int getVersion() {
- return 0;
+ return 1;
}
@Override
@@ -142,6 +143,7 @@ public final class CommittableCollectorSerializer<CommT>
throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeLong(checkpoint.getCheckpointId());
+ out.writeInt(checkpoint.getNumberOfSubtasks());
SimpleVersionedSerialization.writeVersionAndSerializeList(
new SubtaskSimpleVersionedSerializer(),
new
ArrayList<>(checkpoint.getSubtaskCommittableManagers()),
@@ -155,6 +157,7 @@ public final class CommittableCollectorSerializer<CommT>
DataInputDeserializer in = new DataInputDeserializer(serialized);
long checkpointId = in.readLong();
+ int numberOfSubtasks = version == 0 ? owningNumberOfSubtasks :
in.readInt();
List<SubtaskCommittableManager<CommT>> subtaskCommittableManagers =
SimpleVersionedSerialization.readVersionAndDeserializeList(
@@ -165,18 +168,16 @@ public final class CommittableCollectorSerializer<CommT>
for (SubtaskCommittableManager<CommT> subtaskCommittableManager :
subtaskCommittableManagers) {
-
// merge in case we already have a manager for that subtaskId
+ // merging is only necessary for recovering Flink 1 unaligned
checkpoints
subtasksCommittableManagers.merge(
- subtaskId, subtaskCommittableManager,
SubtaskCommittableManager::merge);
+ subtaskCommittableManager.getSubtaskId(),
+ subtaskCommittableManager,
+ SubtaskCommittableManager::merge);
}
return new CheckpointCommittableManagerImpl<>(
- subtasksCommittableManagers,
- subtaskId,
- numberOfSubtasks,
- checkpointId,
- metricGroup);
+ subtasksCommittableManagers, numberOfSubtasks,
checkpointId, metricGroup);
}
}
@@ -206,12 +207,13 @@ public final class CommittableCollectorSerializer<CommT>
@Override
public int getVersion() {
- return 0;
+ return 1;
}
@Override
public byte[] serialize(SubtaskCommittableManager<CommT> subtask)
throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
+ out.writeInt(subtask.getSubtaskId());
SimpleVersionedSerialization.writeVersionAndSerializeList(
new RequestSimpleVersionedSerializer(),
new ArrayList<>(subtask.getRequests()),
@@ -226,6 +228,8 @@ public final class CommittableCollectorSerializer<CommT>
public SubtaskCommittableManager<CommT> deserialize(int version,
byte[] serialized)
throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
+ // Version 0 didn't store the subtaskId, so use default value.
+ int subtaskId = version == 0 ? owningSubtaskId : in.readInt();
List<CommitRequestImpl<CommT>> requests =
SimpleVersionedSerialization.readVersionAndDeserializeList(
new RequestSimpleVersionedSerializer(), in);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
deleted file mode 100644
index f6f176d6748..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink.committables;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * Internal wrapper to handle the committing of committables.
- *
- * @param <CommT> type of the committable
- */
-@Internal
-public interface CommittableManager<CommT> {
- /** Returns a summary of the current commit progress. */
- CommittableSummary<CommT> getSummary();
-
- /**
- * Commits all due committables if all respective committables of the
specific subtask and
- * checkpoint have been received.
- *
- * @param committer used to commit to the external system
- * @return successfully committed committables with meta information
- * @throws IOException
- * @throws InterruptedException
- */
- Collection<CommittableWithLineage<CommT>> commit(Committer<CommT>
committer)
- throws IOException, InterruptedException;
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
index 2421e6fb234..f8d81bda62b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,6 +31,7 @@ import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -36,7 +39,7 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
-/** Manages the committables coming from one subtask. */
+/** Manages the committables coming from one upstream subtask. */
class SubtaskCommittableManager<CommT> {
private final Deque<CommitRequestImpl<CommT>> requests;
private final int numExpectedCommittables;
@@ -210,6 +213,29 @@ class SubtaskCommittableManager<CommT> {
metricGroup);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtaskCommittableManager<?> that = (SubtaskCommittableManager<?>) o;
+ return numExpectedCommittables == that.numExpectedCommittables
+ && checkpointId == that.checkpointId
+ && subtaskId == that.subtaskId
+ && numDrained == that.numDrained
+ && numFailed == that.numFailed
+ && Iterables.elementsEqual(requests, that.requests);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ requests, numExpectedCommittables, checkpointId, subtaskId,
numDrained, numFailed);
+ }
+
@Override
public String toString() {
return "SubtaskCommittableManager{"
@@ -225,8 +251,6 @@ class SubtaskCommittableManager<CommT> {
+ numDrained
+ ", numFailed="
+ numFailed
- + ", metricGroup="
- + metricGroup
+ '}';
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java
index 0df8f1674e1..12a5644fe71 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
+import
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import
org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
@@ -49,11 +50,7 @@ class GlobalCommitterSerializerTest {
new IntegerSerializer(), SUBTASK_ID, NUMBER_OF_SUBTASKS,
METRIC_GROUP);
private static final GlobalCommitterSerializer<Integer, String> SERIALIZER
=
new GlobalCommitterSerializer<>(
- COMMITTABLE_COLLECTOR_SERIALIZER,
- new StringSerializer(),
- SUBTASK_ID,
- NUMBER_OF_SUBTASKS,
- METRIC_GROUP);
+ COMMITTABLE_COLLECTOR_SERIALIZER, new StringSerializer(),
METRIC_GROUP);
@ParameterizedTest
@ValueSource(booleans = {true, false})
@@ -62,11 +59,8 @@ class GlobalCommitterSerializerTest {
new GlobalCommitterSerializer<>(
COMMITTABLE_COLLECTOR_SERIALIZER,
withSinkV1State ? new StringSerializer() : null,
- SUBTASK_ID,
- NUMBER_OF_SUBTASKS,
METRIC_GROUP);
- final CommittableCollector<Integer> collector =
- new CommittableCollector<>(SUBTASK_ID, NUMBER_OF_SUBTASKS,
METRIC_GROUP);
+ final CommittableCollector<Integer> collector = new
CommittableCollector<>(METRIC_GROUP);
collector.addMessage(new CommittableSummary<>(2, 3, 1L, 1, 1, 0));
collector.addMessage(new CommittableWithLineage<>(1, 1L, 2));
final List<String> v1State =
@@ -76,10 +70,11 @@ class GlobalCommitterSerializerTest {
final GlobalCommittableWrapper<Integer, String> copy =
serializer.deserialize(2, serializer.serialize(wrapper));
assertThat(copy.getGlobalCommittables()).containsExactlyInAnyOrderElementsOf(v1State);
- assertThat(collector.getNumberOfSubtasks()).isEqualTo(1);
- assertThat(collector.isFinished()).isFalse();
- assertThat(collector.getSubtaskId()).isEqualTo(0);
- assertThat(collector.getCheckpointCommittablesUpTo(2)).hasSize(1);
+ assertThat(collector).returns(false, CommittableCollector::isFinished);
+ assertThat(collector.getCheckpointCommittablesUpTo(2))
+ .singleElement()
+ .returns(1L, CheckpointCommittableManager::getCheckpointId)
+ .returns(3, CheckpointCommittableManager::getNumberOfSubtasks);
}
@Test
@@ -101,8 +96,6 @@ class GlobalCommitterSerializerTest {
assertThat(wrapper.getGlobalCommittables()).containsExactlyInAnyOrder(state1,
state2);
final CommittableCollector<Integer> collector =
wrapper.getCommittableCollector();
- assertThat(collector.getNumberOfSubtasks()).isEqualTo(1);
- assertThat(collector.getSubtaskId()).isEqualTo(0);
assertThat(collector.getCheckpointCommittablesUpTo(Long.MAX_VALUE)).isEmpty();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
index 6c8687b63c9..b9c65c7c167 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -43,20 +44,17 @@ class CheckpointCommittableManagerImplTest {
@Test
void testAddSummary() {
final CheckpointCommittableManagerImpl<Integer> checkpointCommittables
=
- new CheckpointCommittableManagerImpl<>(2, 1, 1L, METRIC_GROUP);
+ new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L,
METRIC_GROUP);
assertThat(checkpointCommittables.getSubtaskCommittableManagers()).isEmpty();
final CommittableSummary<Integer> first = new CommittableSummary<>(1,
1, 1L, 1, 0, 0);
checkpointCommittables.addSummary(first);
assertThat(checkpointCommittables.getSubtaskCommittableManagers())
- .hasSize(1)
- .satisfiesExactly(
- (s) -> {
- assertThat(s.getSubtaskId()).isEqualTo(2);
- assertThat(s.getCheckpointId()).isEqualTo(1L);
- assertThat(s.getNumPending()).isEqualTo(1);
- assertThat(s.getNumFailed()).isEqualTo(0);
- });
+ .singleElement()
+ .returns(1, SubtaskCommittableManager::getSubtaskId)
+ .returns(1L, SubtaskCommittableManager::getCheckpointId)
+ .returns(1, SubtaskCommittableManager::getNumPending)
+ .returns(0, SubtaskCommittableManager::getNumFailed);
// Add different subtask id
final CommittableSummary<Integer> third = new CommittableSummary<>(2,
1, 2L, 2, 1, 1);
@@ -67,7 +65,7 @@ class CheckpointCommittableManagerImplTest {
@Test
void testCommit() throws IOException, InterruptedException {
final CheckpointCommittableManagerImpl<Integer> checkpointCommittables
=
- new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP);
+ new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L,
METRIC_GROUP);
checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L,
1, 0, 0));
checkpointCommittables.addSummary(new CommittableSummary<>(2, 1, 1L,
2, 0, 0));
checkpointCommittables.addCommittable(new CommittableWithLineage<>(3,
1L, 1));
@@ -87,15 +85,14 @@ class CheckpointCommittableManagerImplTest {
// Commit all committables
assertThat(checkpointCommittables.commit(committer))
.hasSize(2)
- .satisfiesExactly(
- c -> assertThat(c.getCommittable()).isEqualTo(4),
- c -> assertThat(c.getCommittable()).isEqualTo(5));
+ .extracting(CommittableWithLineage::getCommittable)
+ .containsExactlyInAnyOrder(4, 5);
}
@Test
void testUpdateCommittableSummary() {
final CheckpointCommittableManagerImpl<Integer> checkpointCommittables
=
- new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP);
+ new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L,
METRIC_GROUP);
checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L,
1, 0, 0));
assertThatThrownBy(
() ->
@@ -113,14 +110,14 @@ class CheckpointCommittableManagerImplTest {
final CheckpointCommittableManagerImpl<Integer> original =
new CheckpointCommittableManagerImpl<>(
- subtaskId, numberOfSubtasks, checkpointId,
METRIC_GROUP);
+ new HashMap<>(), numberOfSubtasks, checkpointId,
METRIC_GROUP);
original.addSummary(
new CommittableSummary<>(subtaskId, numberOfSubtasks,
checkpointId, 1, 0, 0));
CheckpointCommittableManagerImpl<Integer> copy = original.copy();
assertThat(copy.getCheckpointId()).isEqualTo(checkpointId);
- SinkV2Assertions.assertThat(copy.getSummary())
+ SinkV2Assertions.assertThat(copy.getSummary(subtaskId,
numberOfSubtasks))
.hasNumberOfSubtasks(numberOfSubtasks)
.hasSubtaskId(subtaskId)
.hasCheckpointId(checkpointId);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
index e6d1d9d21d9..d0f79521f74 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
@@ -32,6 +32,7 @@ import
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.shaded.guava32.com.google.common.collect.Streams;
import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -66,19 +67,18 @@ class CommittableCollectorSerializerTest {
final byte[] serialized = out.getCopyOfBuffer();
final CommittableCollector<Integer> committableCollector =
SERIALIZER.deserialize(1, serialized);
- assertThat(committableCollector.getNumberOfSubtasks()).isEqualTo(1);
+
assertThat(committableCollector.isFinished()).isFalse();
- assertThat(committableCollector.getSubtaskId()).isEqualTo(0);
- final Collection<CheckpointCommittableManagerImpl<Integer>>
checkpointCommittables =
- committableCollector.getCheckpointCommittables();
- assertThat(checkpointCommittables).hasSize(1);
- final SubtaskCommittableManager<Integer> subtaskCommittableManager =
-
checkpointCommittables.iterator().next().getSubtaskCommittableManager(0);
- assertThat(
- subtaskCommittableManager
- .getPendingRequests()
- .map(CommitRequestImpl::getCommittable)
- .collect(Collectors.toList()))
+
+ assertThat(committableCollector.getCheckpointCommittables())
+ .singleElement()
+ .extracting(
+ checkpointCommittable ->
+ checkpointCommittable
+ .getSubtaskCommittableManager(0)
+ .getPendingRequests()
+
.map(CommitRequestImpl::getCommittable),
+ InstanceOfAssertFactories.stream(Integer.class))
.containsExactly(1, 2, 3);
}
@@ -92,7 +92,7 @@ class CommittableCollectorSerializerTest {
COMMITTABLE_SERIALIZER, subtaskId, numberOfSubtasks,
METRIC_GROUP);
final CommittableCollector<Integer> committableCollector =
- new CommittableCollector<>(subtaskId, numberOfSubtasks,
METRIC_GROUP);
+ new CommittableCollector<>(METRIC_GROUP);
committableCollector.addMessage(
new CommittableSummary<>(subtaskId, numberOfSubtasks, 1L, 1,
1, 0));
committableCollector.addMessage(
@@ -104,9 +104,7 @@ class CommittableCollectorSerializerTest {
ccSerializer.deserialize(2,
SERIALIZER.serialize(committableCollector));
// Expect the subtask Id equal to the origin of the collector
- assertThat(copy.getSubtaskId()).isEqualTo(subtaskId);
assertThat(copy.isFinished()).isFalse();
- assertThat(copy.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);
// assert original CommittableCollector
assertCommittableCollector(
@@ -136,7 +134,7 @@ class CommittableCollectorSerializerTest {
COMMITTABLE_SERIALIZER, subtaskId, numberOfSubtasks,
METRIC_GROUP);
final CommittableCollector<Integer> committableCollector =
- new CommittableCollector<>(subtaskId, numberOfSubtasks,
METRIC_GROUP);
+ new CommittableCollector<>(METRIC_GROUP);
committableCollector.addMessage(
new CommittableSummary<>(subtaskId, numberOfSubtasks, 1L, 1,
1, 0));
committableCollector.addMessage(
@@ -148,9 +146,7 @@ class CommittableCollectorSerializerTest {
ccSerializer.deserialize(2,
SERIALIZER.serialize(committableCollector));
// Expect the subtask Id equal to the origin of the collector
- assertThat(copy.getSubtaskId()).isEqualTo(subtaskId);
assertThat(copy.isFinished()).isFalse();
- assertThat(copy.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);
// assert original CommittableCollector
assertCommittableCollector(
@@ -161,12 +157,7 @@ class CommittableCollectorSerializerTest {
Collections.singletonList(Collections.singletonList(1)));
// assert deserialized CommittableCollector
- assertCommittableCollector(
- "Deserialized CommittableCollector",
- subtaskId,
- numberOfSubtasks,
- copy,
- Collections.singletonList(Arrays.asList(1, 1)));
+ assertThat(copy).isEqualTo(committableCollector);
}
@Test
@@ -176,7 +167,7 @@ class CommittableCollectorSerializerTest {
// WriterInitContext#INITIAL_CHECKPOINT_ID
long checkpointId = WriterInitContext.INITIAL_CHECKPOINT_ID + 1;
final CommittableCollector<Integer> committableCollector =
- new CommittableCollector<>(SUBTASK_ID, NUMBER_OF_SUBTASKS,
METRIC_GROUP);
+ new CommittableCollector<>(METRIC_GROUP);
committableCollector.addMessage(
new CommittableSummary<>(SUBTASK_ID, NUMBER_OF_SUBTASKS,
checkpointId, 1, 1, 0));
committableCollector.addMessage(new CommittableWithLineage<>(1,
checkpointId, SUBTASK_ID));
@@ -197,7 +188,7 @@ class CommittableCollectorSerializerTest {
* @param assertMessageHeading prefix used for assertion fail message.
* @param subtaskId subtaskId to get {@link SubtaskCommittableManager}
from {@link
* CheckpointCommittableManagerImpl}
- * @param expectedNumberOfSubtasks expected number of subtasks for {@link
CommittableSummary}
+ * @param numberOfSubtasks expected number of subtasks for {@link
CommittableSummary}
* @param committableCollector collector to get {@link
CheckpointCommittableManager}s from.
* @param committablesPerSubtaskPerCheckpoint every of the list element
represents expected
* number of pending request per {@link SubtaskCommittableManager}.
@@ -205,7 +196,7 @@ class CommittableCollectorSerializerTest {
private void assertCommittableCollector(
String assertMessageHeading,
int subtaskId,
- int expectedNumberOfSubtasks,
+ int numberOfSubtasks,
CommittableCollector<Integer> committableCollector,
List<List<Integer>> committablesPerSubtaskPerCheckpoint) {
@@ -235,9 +226,10 @@ class CommittableCollectorSerializerTest {
subtaskId);
SinkV2Assertions.assertThat(
-
checkpointCommittableManager.getSummary())
+
checkpointCommittableManager.getSummary(
+ subtaskId,
numberOfSubtasks))
.hasSubtaskId(subtaskId)
-
.hasNumberOfSubtasks(expectedNumberOfSubtasks);
+
.hasNumberOfSubtasks(numberOfSubtasks);
assertPendingRequests(
subtaskCommittableManager,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
index 3571e60f2c9..b33faafa8dd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
@@ -35,7 +35,7 @@ class CommittableCollectorTest {
@Test
void testGetCheckpointCommittablesUpTo() {
final CommittableCollector<Integer> committableCollector =
- new CommittableCollector<>(1, 1, METRIC_GROUP);
+ new CommittableCollector<>(METRIC_GROUP);
CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 1L,
1, 0, 0);
committableCollector.addMessage(first);
CommittableSummary<Integer> second = new CommittableSummary<>(1, 1,
2L, 1, 0, 0);
@@ -50,13 +50,13 @@ class CommittableCollectorTest {
@Test
void testGetEndOfInputCommittable() {
final CommittableCollector<Integer> committableCollector =
- new CommittableCollector<>(1, 1, METRIC_GROUP);
+ new CommittableCollector<>(METRIC_GROUP);
CommittableSummary<Integer> first = new CommittableSummary<>(1, 1,
EOI, 1, 0, 0);
committableCollector.addMessage(first);
- CommittableManager<Integer> endOfInputCommittable =
+ CheckpointCommittableManager<Integer> endOfInputCommittable =
committableCollector.getEndOfInputCommittable();
assertThat(endOfInputCommittable).isNotNull();
-
SinkV2Assertions.assertThat(endOfInputCommittable.getSummary()).hasCheckpointId(EOI);
+ SinkV2Assertions.assertThat(endOfInputCommittable.getSummary(1,
1)).hasCheckpointId(EOI);
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index dec63478475..096e2202dcb 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -79,7 +79,7 @@ abstract class CommitterOperatorTestBase {
.hasOverallCommittables(committableSummary.getNumberOfCommittables())
.hasPendingCommittables(0);
SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
-
.isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
+ .isEqualTo(committableWithLineage.withSubtaskId(0));
} else {
assertThat(testHarness.getOutput()).isEmpty();
}
@@ -120,9 +120,9 @@ abstract class CommitterOperatorTestBase {
.hasOverallCommittables(committableSummary.getNumberOfCommittables())
.hasPendingCommittables(0);
SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
- .isEqualTo(copyCommittableWithDifferentOrigin(first, 0));
+ .isEqualTo(first.withSubtaskId(0));
SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(2)))
- .isEqualTo(copyCommittableWithDifferentOrigin(second, 0));
+ .isEqualTo(second.withSubtaskId(0));
testHarness.close();
}
@@ -161,9 +161,9 @@ abstract class CommitterOperatorTestBase {
.hasOverallCommittables(2)
.hasPendingCommittables(0);
SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
- .isEqualTo(copyCommittableWithDifferentOrigin(first, 0));
+ .isEqualTo(first.withSubtaskId(0));
SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(2)))
- .isEqualTo(copyCommittableWithDifferentOrigin(second, 0));
+ .isEqualTo(second.withSubtaskId(0));
testHarness.close();
}
@@ -283,7 +283,7 @@ abstract class CommitterOperatorTestBase {
.hasOverallCommittables(1)
.hasFailedCommittables(0);
SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
-
.isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
+ .isEqualTo(committableWithLineage.withSubtaskId(0));
// Future emission calls should change the output
testHarness.notifyOfCompletedCheckpoint(2);
@@ -292,12 +292,6 @@ abstract class CommitterOperatorTestBase {
assertThat(testHarness.getOutput()).hasSize(2);
}
- CommittableWithLineage<?> copyCommittableWithDifferentOrigin(
- CommittableWithLineage<?> committable, int subtaskId) {
- return new CommittableWithLineage<>(
- committable.getCommittable(),
committable.getCheckpointIdOrEOI(), subtaskId);
- }
-
private OneInputStreamOperatorTestHarness<
CommittableMessage<String>, CommittableMessage<String>>
createTestHarness(