This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d536b5a7d73bb0696291d6ea7c47544f4edd3e77
Author: David Moravek <d...@apache.org>
AuthorDate: Mon Jan 15 18:27:42 2024 +0100

    [FLINK-34063][runtime] Operator states need to be restored in an order 
they've been written for the compression to work properly on empty states.
---
 .../state/OperatorStateRestoreOperation.java       | 43 ++++++----------------
 .../state/OperatorStateRestoreOperationTest.java   | 32 ++++++++++++++--
 2 files changed, 40 insertions(+), 35 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
index fd983fd5d28..1634641d68b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
@@ -34,10 +34,8 @@ import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /** Implementation of operator state restore operation. */
 public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
@@ -171,34 +169,18 @@ public class OperatorStateRestoreOperation implements 
RestoreOperation<Void> {
                     }
                 }
 
-                List<Map.Entry<String, OperatorStateHandle.StateMetaInfo>> 
entries =
-                        new 
ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet());
-
-                if (backendSerializationProxy.isUsingStateCompression()) {
-                    // sort state handles by offsets to avoid building 
SnappyFramedInputStream with
-                    // EOF stream.
-                    entries =
-                            entries.stream()
-                                    .sorted(
-                                            Comparator.comparingLong(
-                                                    entry -> {
-                                                        
OperatorStateHandle.StateMetaInfo
-                                                                stateMetaInfo 
= entry.getValue();
-                                                        long[] offsets = 
stateMetaInfo.getOffsets();
-                                                        if (offsets == null
-                                                                || 
offsets.length == 0) {
-                                                            return 
Long.MIN_VALUE;
-                                                        } else {
-                                                            return offsets[0];
-                                                        }
-                                                    }))
-                                    .collect(Collectors.toList());
-                }
+                // Restore states in the order in which they were written. 
Operator states come
+                // before Broadcast states.
+                final List<String> toRestore = new ArrayList<>();
+                restoredOperatorMetaInfoSnapshots.forEach(
+                        stateName -> toRestore.add(stateName.getName()));
+                restoredBroadcastMetaInfoSnapshots.forEach(
+                        stateName -> toRestore.add(stateName.getName()));
 
-                // Restore all the states
-                for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> 
nameToOffsets : entries) {
+                for (String stateName : toRestore) {
 
-                    final String stateName = nameToOffsets.getKey();
+                    final OperatorStateHandle.StateMetaInfo offsets =
+                            
stateHandle.getStateNameToPartitionOffsets().get(stateName);
 
                     PartitionableListState<?> listStateForName =
                             registeredOperatorStates.get(stateName);
@@ -222,10 +204,9 @@ public class OperatorStateRestoreOperation implements 
RestoreOperation<Void> {
                                             + "corresponding meta info: "
                                             + stateName);
                             deserializeBroadcastStateValues(
-                                    broadcastStateForName, compressedIn, 
nameToOffsets.getValue());
+                                    broadcastStateForName, compressedIn, 
offsets);
                         } else {
-                            deserializeOperatorStateValues(
-                                    listStateForName, compressedIn, 
nameToOffsets.getValue());
+                            deserializeOperatorStateValues(listStateForName, 
compressedIn, offsets);
                         }
                     }
                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
index 4ce170aed16..47eca087a1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateRestoreOperationTest.java
@@ -63,7 +63,6 @@ public class OperatorStateRestoreOperationTest {
             throws Exception {
         try (OperatorStateBackend operatorStateBackend =
                 operatorStateBackendFactory.apply(Collections.emptyList())) {
-            final CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(4096);
             for (String stateName : listStates.keySet()) {
                 final ListStateDescriptor<String> descriptor =
                         new ListStateDescriptor<>(stateName, String.class);
@@ -84,7 +83,7 @@ public class OperatorStateRestoreOperationTest {
                             .snapshot(
                                     1,
                                     1,
-                                    streamFactory,
+                                    new MemCheckpointStreamFactory(4096),
                                     
CheckpointOptions.forCheckpointWithDefaultLocation())
                             .get();
             return Objects.requireNonNull(result.getJobManagerOwnedSnapshot());
@@ -133,7 +132,7 @@ public class OperatorStateRestoreOperationTest {
     void testRestoringMixedOperatorState(boolean snapshotCompressionEnabled) 
throws Exception {
         final ExecutionConfig cfg = new ExecutionConfig();
         cfg.setUseSnapshotCompression(snapshotCompressionEnabled);
-        ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend>
+        final ThrowingFunction<Collection<OperatorStateHandle>, 
OperatorStateBackend>
                 operatorStateBackendFactory =
                         createOperatorStateBackendFactory(
                                 cfg, new CloseableRegistry(), 
this.getClass().getClassLoader());
@@ -162,7 +161,7 @@ public class OperatorStateRestoreOperationTest {
             throws Exception {
         final ExecutionConfig cfg = new ExecutionConfig();
         cfg.setUseSnapshotCompression(snapshotCompressionEnabled);
-        ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend>
+        final ThrowingFunction<Collection<OperatorStateHandle>, 
OperatorStateBackend>
                 operatorStateBackendFactory =
                         createOperatorStateBackendFactory(
                                 cfg, new CloseableRegistry(), 
this.getClass().getClassLoader());
@@ -199,4 +198,29 @@ public class OperatorStateRestoreOperationTest {
                 mergedListStates,
                 Collections.emptyMap());
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testEmptyPartitionedOperatorState(boolean snapshotCompressionEnabled) 
throws Exception {
+        final ExecutionConfig cfg = new ExecutionConfig();
+        cfg.setUseSnapshotCompression(snapshotCompressionEnabled);
+        final ThrowingFunction<Collection<OperatorStateHandle>, 
OperatorStateBackend>
+                operatorStateBackendFactory =
+                        createOperatorStateBackendFactory(
+                                cfg, new CloseableRegistry(), 
this.getClass().getClassLoader());
+
+        final Map<String, List<String>> listStates = new HashMap<>();
+        listStates.put("bufferState", Collections.emptyList());
+        listStates.put("offsetState", Collections.singletonList("foo"));
+
+        final OperatorStateHandle stateHandle =
+                createOperatorStateHandle(
+                        operatorStateBackendFactory, listStates, 
Collections.emptyMap());
+
+        verifyOperatorStateHandle(
+                operatorStateBackendFactory,
+                Collections.singletonList(stateHandle),
+                listStates,
+                Collections.emptyMap());
+    }
 }

Reply via email to