This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 52519a8eb695c9523c546439c66910b15f19be20 Author: fredia <[email protected]> AuthorDate: Tue Jun 14 12:28:25 2022 +0800 [FLINK-27692][changelog] Refactor ChangelogSnapshotState --- .../changelog/ChangelogKeyedStateBackend.java | 140 ++++++++++++++++----- .../changelog/ChangelogKeyedStateBackendTest.java | 2 +- 2 files changed, 111 insertions(+), 31 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index ba0fb9427c7..1935600fd4e 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -92,7 +92,6 @@ import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static java.util.Collections.unmodifiableList; import static org.apache.flink.state.changelog.PeriodicMaterializationManager.MaterializationRunnable; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -405,10 +404,12 @@ public class ChangelogKeyedStateBackend<K> .whenComplete( (snapshotResult, throwable) -> metrics.reportSnapshotResult(snapshotResult)) - .thenApply( - snapshotResult -> - SnapshotResult.of( - snapshotResult.getJobManagerOwnedSnapshot()))); + .thenApply(this::castSnapshotResult)); + } + + @SuppressWarnings("unchecked") + private SnapshotResult<KeyedStateHandle> castSnapshotResult(SnapshotResult<?> snapshotResult) { + return (SnapshotResult<KeyedStateHandle>) snapshotResult; } private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult( @@ -428,6 +429,26 @@ public class ChangelogKeyedStateBackend<K> if (prevDeltaCopy.isEmpty() && changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) { return SnapshotResult.empty(); + } else if (!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()) { + return SnapshotResult.withLocalState( + new ChangelogStateBackendHandleImpl( + changelogStateBackendStateCopy.getMaterializedSnapshot(), + prevDeltaCopy, + getKeyGroupRange(), + checkpointId, + changelogStateBackendStateCopy.materializationID, + persistedSizeOfThisCheckpoint), + new ChangelogStateBackendHandleImpl( + changelogStateBackendStateCopy.getLocalMaterializedSnapshot(), + // TODO: Restore ChangelogStateHandles from remote temporarily, because + // ChangelogStateHandles are small(about 10MB). + // In the future, the double-stream option may be implemented according + // to the test results. + prevDeltaCopy, + getKeyGroupRange(), + checkpointId, + changelogStateBackendStateCopy.materializationID, + persistedSizeOfThisCheckpoint)); } else { return SnapshotResult.of( new ChangelogStateBackendHandleImpl( @@ -618,7 +639,7 @@ public class ChangelogKeyedStateBackend<K> } } this.materializedId = materializationId + 1; - + // Todo: distinguish whether the handle is local or remote return new ChangelogSnapshotState( materialized, restoredNonMaterialized, @@ -693,11 +714,20 @@ public class ChangelogKeyedStateBackend<K> upTo, materializedSnapshot); changelogSnapshotState = - new ChangelogSnapshotState( - getMaterializedResult(materializedSnapshot), - Collections.emptyList(), - upTo, - materializationID); + materializedSnapshot.getTaskLocalSnapshot() == null + ? new ChangelogSnapshotState( + getMaterializedResult(materializedSnapshot), + Collections.emptyList(), + upTo, + materializationID) + : new ChangelogSnapshotState( + getMaterializedResult(materializedSnapshot), + getLocalMaterializedResult(materializedSnapshot), + Collections.emptyList(), + Collections.emptyList(), + upTo, + materializationID); + changelogTruncateHelper.materialized(upTo); } @@ -708,6 +738,12 @@ public class ChangelogKeyedStateBackend<K> return jobManagerOwned == null ? emptyList() : singletonList(jobManagerOwned); } + private List<KeyedStateHandle> getLocalMaterializedResult( + @Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) { + KeyedStateHandle taskLocalSnapshot = materializedSnapshot.getTaskLocalSnapshot(); + return taskLocalSnapshot == null ? emptyList() : singletonList(taskLocalSnapshot); + } + @Override public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean recursive) { return keyedStateBackend.getDelegatedKeyedStateBackend(recursive); @@ -791,18 +827,16 @@ public class ChangelogKeyedStateBackend<K> } /** - * Snapshot State for ChangelogKeyedStatebackend. + * Snapshot State for ChangelogKeyedStatebackend, a wrapper over {@link SnapshotResult}. * * <p>It includes three parts: - materialized snapshot from the underlying delegated state * backend - non-materialized part in the current changelog - non-materialized changelog, from * previous logs (before failover or rescaling) */ - private static class ChangelogSnapshotState { - /** - * Materialized snapshot from the underlying delegated state backend. Set initially on - * restore and later upon materialization. - */ - private final List<KeyedStateHandle> materializedSnapshot; + private class ChangelogSnapshotState { + + /** Set initially on restore and later upon materialization. */ + private final SnapshotResult<ChangelogStateBackendHandle> changelogSnapshot; /** * The {@link SequenceNumber} up to which the state is materialized, exclusive. This @@ -810,12 +844,6 @@ public class ChangelogKeyedStateBackend<K> */ private final SequenceNumber materializedTo; - /** - * Non-materialized changelog, from previous logs. Set initially on restore and later - * cleared upon materialization. - */ - private final List<ChangelogStateHandle> restoredNonMaterialized; - /** ID of this materialization corresponding to the nested backend checkpoint ID. */ private final long materializationID; @@ -824,22 +852,74 @@ public class ChangelogKeyedStateBackend<K> List<ChangelogStateHandle> restoredNonMaterialized, SequenceNumber materializedTo, long materializationID) { - this.materializedSnapshot = unmodifiableList((materializedSnapshot)); - this.restoredNonMaterialized = unmodifiableList(restoredNonMaterialized); + this.changelogSnapshot = + SnapshotResult.of( + new ChangelogStateBackendHandleImpl( + materializedSnapshot, + restoredNonMaterialized, + getKeyGroupRange(), + lastCheckpointId, + materializationID, + 0L)); + this.materializedTo = materializedTo; + this.materializationID = materializationID; + } + + public ChangelogSnapshotState( + List<KeyedStateHandle> materializedSnapshot, + List<KeyedStateHandle> localMaterializedSnapshot, + List<ChangelogStateHandle> restoredNonMaterialized, + List<ChangelogStateHandle> localRestoredNonMaterialized, + SequenceNumber materializedTo, + long materializationID) { + this.changelogSnapshot = + SnapshotResult.withLocalState( + new ChangelogStateBackendHandleImpl( + materializedSnapshot, + restoredNonMaterialized, + getKeyGroupRange(), + lastCheckpointId, + materializationID, + 0L), + new ChangelogStateBackendHandleImpl( + localMaterializedSnapshot, + localRestoredNonMaterialized, + getKeyGroupRange(), + lastCheckpointId, + materializationID, + 0L)); this.materializedTo = materializedTo; this.materializationID = materializationID; } public List<KeyedStateHandle> getMaterializedSnapshot() { - return materializedSnapshot; + return changelogSnapshot.getJobManagerOwnedSnapshot() != null + ? changelogSnapshot.getJobManagerOwnedSnapshot().getMaterializedStateHandles() + : Collections.emptyList(); } - public SequenceNumber lastMaterializedTo() { - return materializedTo; + public List<KeyedStateHandle> getLocalMaterializedSnapshot() { + return changelogSnapshot.getTaskLocalSnapshot() != null + ? changelogSnapshot.getTaskLocalSnapshot().getMaterializedStateHandles() + : Collections.emptyList(); } public List<ChangelogStateHandle> getRestoredNonMaterialized() { - return restoredNonMaterialized; + return changelogSnapshot.getJobManagerOwnedSnapshot() != null + ? changelogSnapshot + .getJobManagerOwnedSnapshot() + .getNonMaterializedStateHandles() + : Collections.emptyList(); + } + + public List<ChangelogStateHandle> getLocalRestoredNonMaterialized() { + return changelogSnapshot.getTaskLocalSnapshot() != null + ? changelogSnapshot.getTaskLocalSnapshot().getNonMaterializedStateHandles() + : Collections.emptyList(); + } + + public SequenceNumber lastMaterializedTo() { + return materializedTo; } public long getMaterializationID() { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index fa15341322e..5bd491743b0 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -91,7 +91,7 @@ public class ChangelogKeyedStateBackendTest { IntSerializer.INSTANCE, getClass().getClassLoader(), 1, - KeyGroupRange.EMPTY_KEY_GROUP_RANGE, + KeyGroupRange.of(0, 0), new ExecutionConfig(), TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(),
