This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new fdcf730 [FLINK-22133][core] Add checkpointID to
'SplitEnumerator.snapshotState()'
fdcf730 is described below
commit fdcf730ce106cfe0050593a3ddc068a8b7222e94
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Apr 19 17:32:00 2021 +0200
[FLINK-22133][core] Add checkpointID to 'SplitEnumerator.snapshotState()'
This closes #15677
---
.../base/source/reader/mocks/MockSplitEnumerator.java | 2 +-
.../file/src/impl/ContinuousFileSplitEnumerator.java | 3 ++-
.../file/src/impl/StaticFileSplitEnumerator.java | 2 +-
.../src/impl/ContinuousFileSplitEnumeratorTest.java | 6 +++---
.../file/src/impl/StaticFileSplitEnumeratorTest.java | 6 +++---
.../connectors/hive/ContinuousHiveSplitEnumerator.java | 3 ++-
.../kafka/source/enumerator/KafkaSourceEnumerator.java | 2 +-
.../kafka/source/enumerator/KafkaEnumeratorTest.java | 4 ++--
.../flink/api/connector/source/SplitEnumerator.java | 18 ++++++++++++++++--
.../source/lib/util/IteratorSourceEnumerator.java | 2 +-
.../connector/source/mocks/MockSplitEnumerator.java | 2 +-
.../runtime/source/coordinator/SourceCoordinator.java | 9 +++++----
.../source/coordinator/SourceCoordinatorTest.java | 4 ++--
.../source/coordinator/TestingSplitEnumerator.java | 2 +-
.../OperatorEventSendingCheckpointITCase.java | 4 ++--
.../checkpointing/UnalignedCheckpointTestBase.java | 2 +-
16 files changed, 44 insertions(+), 27 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index 188568c..fe8a675 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -77,7 +77,7 @@ public class MockSplitEnumerator
}
@Override
- public List<MockSourceSplit> snapshotState() {
+ public List<MockSourceSplit> snapshotState(long checkpointId) {
return splits;
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
index 2068445..8102e0d 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
@@ -124,7 +124,8 @@ public class ContinuousFileSplitEnumerator
}
@Override
- public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() throws
Exception {
+ public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long
checkpointId)
+ throws Exception {
final PendingSplitsCheckpoint<FileSourceSplit> checkpoint =
PendingSplitsCheckpoint.fromCollectionSnapshot(
splitAssigner.remainingSplits(),
pathsAlreadyProcessed);
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
index 7dc2489..140f520 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -118,7 +118,7 @@ public class StaticFileSplitEnumerator
}
@Override
- public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() {
+ public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long
checkpointId) {
return
PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
}
}
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
index 72d7823..1fe4b2d 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
@@ -57,7 +57,7 @@ public class ContinuousFileSplitEnumeratorTest {
fileEnumerator.addSplits(split);
context.triggerAllActions();
- assertThat(enumerator.snapshotState().getSplits(), contains(split));
+ assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
}
@Test
@@ -77,7 +77,7 @@ public class ContinuousFileSplitEnumeratorTest {
fileEnumerator.addSplits(split);
context.triggerAllActions();
- assertThat(enumerator.snapshotState().getSplits(), empty());
+ assertThat(enumerator.snapshotState(1L).getSplits(), empty());
assertThat(context.getSplitAssignments().get(2).getAssignedSplits(),
contains(split));
}
@@ -102,7 +102,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.triggerAllActions();
assertFalse(context.getSplitAssignments().containsKey(2));
- assertThat(enumerator.snapshotState().getSplits(), contains(split));
+ assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
}
// ------------------------------------------------------------------------
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
index f554b9f..71fcf2a 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
@@ -52,7 +52,7 @@ public class StaticFileSplitEnumeratorTest {
final FileSourceSplit split = createRandomSplit();
final StaticFileSplitEnumerator enumerator = createEnumerator(context,
split);
- final PendingSplitsCheckpoint<FileSourceSplit> checkpoint =
enumerator.snapshotState();
+ final PendingSplitsCheckpoint<FileSourceSplit> checkpoint =
enumerator.snapshotState(1L);
assertThat(checkpoint.getSplits(), contains(split));
}
@@ -68,7 +68,7 @@ public class StaticFileSplitEnumeratorTest {
enumerator.addReader(3);
enumerator.handleSplitRequest(3, "somehost");
- assertThat(enumerator.snapshotState().getSplits(), empty());
+ assertThat(enumerator.snapshotState(1L).getSplits(), empty());
assertThat(context.getSplitAssignments().get(3).getAssignedSplits(),
contains(split));
}
@@ -82,7 +82,7 @@ public class StaticFileSplitEnumeratorTest {
enumerator.handleSplitRequest(3, "somehost");
assertFalse(context.getSplitAssignments().containsKey(3));
- assertThat(enumerator.snapshotState().getSplits(), contains(split));
+ assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
}
@Test
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
index 1b85fb0..d5884d1 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -132,7 +132,8 @@ public class ContinuousHiveSplitEnumerator<T extends
Comparable<T>>
}
@Override
- public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState() throws
Exception {
+ public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState(long
checkpointId)
+ throws Exception {
Collection<HiveSourceSplit> remainingSplits =
(Collection<HiveSourceSplit>) (Collection<?>)
splitAssigner.remainingSplits();
return new ContinuousHivePendingSplitsCheckpoint(
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index b8f2728..9c69f38 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -190,7 +190,7 @@ public class KafkaSourceEnumerator
}
@Override
- public KafkaSourceEnumState snapshotState() throws Exception {
+ public KafkaSourceEnumState snapshotState(long checkpointId) throws
Exception {
return new KafkaSourceEnumState(assignedPartitions);
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 679f732..a46ac2d 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -333,7 +333,7 @@ public class KafkaEnumeratorTest {
enumerator.start();
// No reader is registered, so the state should be empty
- final KafkaSourceEnumState state1 = enumerator.snapshotState();
+ final KafkaSourceEnumState state1 = enumerator.snapshotState(1L);
assertTrue(state1.assignedPartitions().isEmpty());
registerReader(context, enumerator, READER0);
@@ -341,7 +341,7 @@ public class KafkaEnumeratorTest {
context.runNextOneTimeCallable();
// The state should contain splits assigned to READER0 and READER1
- final KafkaSourceEnumState state2 = enumerator.snapshotState();
+ final KafkaSourceEnumState state2 = enumerator.snapshotState(1L);
verifySplitAssignmentWithPartitions(
getExpectedAssignments(
new HashSet<>(Arrays.asList(READER0, READER1)),
PRE_EXISTING_TOPICS),
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index 5bc0791..2654671 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -68,12 +68,26 @@ public interface SplitEnumerator<SplitT extends
SourceSplit, CheckpointT>
void addReader(int subtaskId);
/**
- * Checkpoints the state of this split enumerator.
+ * Creates a snapshot of the state of this split enumerator, to be stored
in a checkpoint.
*
+ * <p>The snapshot should contain the latest state of the enumerator: It
should assume that all
+ * operations that happened before the snapshot have successfully
completed. For example all
+ * splits assigned to readers via {@link
SplitEnumeratorContext#assignSplit(SourceSplit, int)}
+ * and {@link SplitEnumeratorContext#assignSplits(SplitsAssignment)})
don't need to be included
+ * in the snapshot anymore.
+ *
+ * <p>This method takes the ID of the checkpoint for which the state is
snapshotted. Most
+ * implementations should be able to ignore this parameter, because for
the contents of the
+ * snapshot, it doesn't matter for which checkpoint it gets created. This
parameter can be
+ * interesting for source connectors with external systems where those
systems are themselves
+ * aware of checkpoints; for example in cases where the enumerator
notifies that system about a
+ * specific checkpoint being triggered.
+ *
+ * @param checkpointId The ID of the checkpoint for which the snapshot is
created.
* @return an object containing the state of the split enumerator.
* @throws Exception when the snapshot cannot be taken.
*/
- CheckpointT snapshotState() throws Exception;
+ CheckpointT snapshotState(long checkpointId) throws Exception;
/**
* Called to close the enumerator, in case it holds on to any resources,
like threads or network
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
index 8d16d15..9c37cca 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -72,7 +72,7 @@ public class IteratorSourceEnumerator<SplitT extends
IteratorSourceSplit<?, ?>>
}
@Override
- public Collection<SplitT> snapshotState() throws Exception {
+ public Collection<SplitT> snapshotState(long checkpointId) throws
Exception {
return remainingSplits;
}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 213a864..2b9dbd3 100644
---
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -98,7 +98,7 @@ public class MockSplitEnumerator implements
SplitEnumerator<MockSourceSplit, Set
}
@Override
- public Set<MockSourceSplit> snapshotState() {
+ public Set<MockSourceSplit> snapshotState(long checkpointId) {
return unassignedSplits;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 0c9852c..add85bb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -235,7 +235,7 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
checkpointId);
try {
context.onCheckpoint(checkpointId);
- result.complete(toBytes());
+ result.complete(toBytes(checkpointId));
} catch (Throwable e) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
result.completeExceptionally(
@@ -352,8 +352,9 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
* @return A byte array containing the serialized state of the source
coordinator.
* @throws Exception When something goes wrong in serialization.
*/
- private byte[] toBytes() throws Exception {
- return writeCheckpointBytes(enumerator.snapshotState(),
enumCheckpointSerializer);
+ private byte[] toBytes(long checkpointId) throws Exception {
+ return writeCheckpointBytes(
+ enumerator.snapshotState(checkpointId),
enumCheckpointSerializer);
}
static <EnumChkT> byte[] writeCheckpointBytes(
@@ -378,7 +379,7 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
/**
* Restore the state of this source coordinator from the state bytes.
*
- * @param bytes The checkpoint bytes that was returned from {@link
#toBytes()}
+ * @param bytes The checkpoint bytes that was returned from {@link
#toBytes(long)}
* @throws Exception When the deserialization failed.
*/
private EnumChkT deserializeCheckpoint(byte[] bytes) throws Exception {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 20d65f8..0b62215 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -336,7 +336,7 @@ public class SourceCoordinatorTest extends
SourceCoordinatorTestBase {
// Build checkpoint data with serde version 0
final TestingSplitEnumerator<MockSourceSplit> enumerator =
getEnumerator();
final Set<MockSourceSplit> splits = new HashSet<>();
- enumerator.runInEnumThreadAndSync(() ->
splits.addAll(enumerator.snapshotState()));
+ enumerator.runInEnumThreadAndSync(() ->
splits.addAll(enumerator.snapshotState(1L)));
final byte[] checkpointDataForV0Serde =
createCheckpointDataWithSerdeV0(splits);
@@ -428,7 +428,7 @@ public class SourceCoordinatorTest extends
SourceCoordinatorTestBase {
}
@Override
- public Set<MockSourceSplit> snapshotState() throws Exception {
+ public Set<MockSourceSplit> snapshotState(long checkpointId) throws
Exception {
throw new UnsupportedOperationException();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
index eda29a9..8af418b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
@@ -110,7 +110,7 @@ public class TestingSplitEnumerator<SplitT extends
SourceSplit>
}
@Override
- public Set<SplitT> snapshotState() {
+ public Set<SplitT> snapshotState(long checkpointId) {
return new HashSet<>(splits);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 9ca560c..9c91386 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -270,12 +270,12 @@ public class OperatorEventSendingCheckpointITCase extends
TestLogger {
}
@Override
- public Collection<SplitT> snapshotState() throws Exception {
+ public Collection<SplitT> snapshotState(long checkpointId) throws
Exception {
// this will be enqueued in the enumerator thread, so it will
actually run after this
// method (the snapshot operation) is complete!
context.runInCoordinatorThread(this::fullFillPendingRequests);
- return super.snapshotState();
+ return super.snapshotState(checkpointId);
}
private void fullFillPendingRequests() {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index eeae84b..b2202cc 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -549,7 +549,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
@Override
- public EnumeratorState snapshotState() throws Exception {
+ public EnumeratorState snapshotState(long checkpointId) throws
Exception {
LOG.info("snapshotState {}", state);
return state;
}