This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d536b5a7d73bb0696291d6ea7c47544f4edd3e77 Author: David Moravek <d...@apache.org> AuthorDate: Mon Jan 15 18:27:42 2024 +0100 [FLINK-34063][runtime] Operator states need to be restored in an order they've been written for the compression to work properly on empty states. --- .../state/OperatorStateRestoreOperation.java | 43 ++++++---------------- .../state/OperatorStateRestoreOperationTest.java | 32 ++++++++++++++-- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java index fd983fd5d28..1634641d68b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java @@ -34,10 +34,8 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** Implementation of operator state restore operation. */ public class OperatorStateRestoreOperation implements RestoreOperation<Void> { @@ -171,34 +169,18 @@ public class OperatorStateRestoreOperation implements RestoreOperation<Void> { } } - List<Map.Entry<String, OperatorStateHandle.StateMetaInfo>> entries = - new ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet()); - - if (backendSerializationProxy.isUsingStateCompression()) { - // sort state handles by offsets to avoid building SnappyFramedInputStream with - // EOF stream. - entries = - entries.stream() - .sorted( - Comparator.comparingLong( - entry -> { - OperatorStateHandle.StateMetaInfo - stateMetaInfo = entry.getValue(); - long[] offsets = stateMetaInfo.getOffsets(); - if (offsets == null - || offsets.length == 0) { - return Long.MIN_VALUE; - } else { - return offsets[0]; - } - })) - .collect(Collectors.toList()); - } + // Restore states in the order in which they were written. Operator states come + // before Broadcast states. + final List<String> toRestore = new ArrayList<>(); + restoredOperatorMetaInfoSnapshots.forEach( + stateName -> toRestore.add(stateName.getName())); + restoredBroadcastMetaInfoSnapshots.forEach( + stateName -> toRestore.add(stateName.getName())); - // Restore all the states - for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : entries) { + for (String stateName : toRestore) { - final String stateName = nameToOffsets.getKey(); + final OperatorStateHandle.StateMetaInfo offsets = + stateHandle.getStateNameToPartitionOffsets().get(stateName); PartitionableListState<?> listStateForName = registeredOperatorStates.get(stateName); @@ -222,10 +204,9 @@ public class OperatorStateRestoreOperation implements RestoreOperation<Void> { + "corresponding meta info: " + stateName); deserializeBroadcastStateValues( - broadcastStateForName, compressedIn, nameToOffsets.getValue()); + broadcastStateForName, compressedIn, offsets); } else { - deserializeOperatorStateValues( - listStateForName, compressedIn, nameToOffsets.getValue()); + deserializeOperatorStateValues(listStateForName, compressedIn, offsets); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java index 4ce170aed16..47eca087a1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java @@ -63,7 +63,6 @@ public class OperatorStateRestoreOperationTest { throws Exception { try (OperatorStateBackend operatorStateBackend = operatorStateBackendFactory.apply(Collections.emptyList())) { - final CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096); for (String stateName : listStates.keySet()) { final ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(stateName, String.class); @@ -84,7 +83,7 @@ public class OperatorStateRestoreOperationTest { .snapshot( 1, 1, - streamFactory, + new MemCheckpointStreamFactory(4096), CheckpointOptions.forCheckpointWithDefaultLocation()) .get(); return Objects.requireNonNull(result.getJobManagerOwnedSnapshot()); @@ -133,7 +132,7 @@ public class OperatorStateRestoreOperationTest { void testRestoringMixedOperatorState(boolean snapshotCompressionEnabled) throws Exception { final ExecutionConfig cfg = new ExecutionConfig(); cfg.setUseSnapshotCompression(snapshotCompressionEnabled); - ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend> + final ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend> operatorStateBackendFactory = createOperatorStateBackendFactory( cfg, new CloseableRegistry(), this.getClass().getClassLoader()); @@ -162,7 +161,7 @@ public class OperatorStateRestoreOperationTest { throws Exception { final ExecutionConfig cfg = new ExecutionConfig(); cfg.setUseSnapshotCompression(snapshotCompressionEnabled); - ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend> + final ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend> operatorStateBackendFactory = createOperatorStateBackendFactory( cfg, new CloseableRegistry(), this.getClass().getClassLoader()); @@ -199,4 +198,29 @@ public class OperatorStateRestoreOperationTest { mergedListStates, Collections.emptyMap()); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmptyPartitionedOperatorState(boolean snapshotCompressionEnabled) throws Exception { + final ExecutionConfig cfg = new ExecutionConfig(); + cfg.setUseSnapshotCompression(snapshotCompressionEnabled); + final ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend> + operatorStateBackendFactory = + createOperatorStateBackendFactory( + cfg, new CloseableRegistry(), this.getClass().getClassLoader()); + + final Map<String, List<String>> listStates = new HashMap<>(); + listStates.put("bufferState", Collections.emptyList()); + listStates.put("offsetState", Collections.singletonList("foo")); + + final OperatorStateHandle stateHandle = + createOperatorStateHandle( + operatorStateBackendFactory, listStates, Collections.emptyMap()); + + verifyOperatorStateHandle( + operatorStateBackendFactory, + Collections.singletonList(stateHandle), + listStates, + Collections.emptyMap()); + } }