This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new 2a4f1f2ee7f [FLINK-38483][checkpoint] Fix the bug that Job cannot be 
recovered from unaligned checkpoint due to `Cannot get old subtasks from a 
descriptor that represents no state.` exception
2a4f1f2ee7f is described below

commit 2a4f1f2ee7fbbb249dacc0d87be392c9fb88c3f6
Author: Rui Fan <[email protected]>
AuthorDate: Mon Oct 6 19:37:56 2025 +0200

    [FLINK-38483][checkpoint] Fix the bug that Job cannot be recovered from 
unaligned checkpoint due to `Cannot get old subtasks from a descriptor that 
represents no state.` exception
---
 .../InflightDataRescalingDescriptor.java           | 14 +++---
 .../InflightDataRescalingDescriptorTest.java       | 25 ++++------
 ...dCheckpointRescaleWithMixedExchangesITCase.java | 54 +++++++++++++++++++++-
 3 files changed, 69 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java
index 4dffeee0bb9..fffc7b44305 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java
@@ -33,6 +33,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class InflightDataRescalingDescriptor implements Serializable {
 
+    private static final int[] EMPTY_INT_ARRAY = new int[0];
+
     public static final InflightDataRescalingDescriptor NO_RESCALE = new 
NoRescalingDescriptor();
 
     private static final long serialVersionUID = -3396674344669796295L;
@@ -115,8 +117,8 @@ public class InflightDataRescalingDescriptor implements 
Serializable {
 
         public static final InflightDataGateOrPartitionRescalingDescriptor 
NO_STATE =
                 new InflightDataGateOrPartitionRescalingDescriptor(
-                        new int[0],
-                        RescaleMappings.identity(0, 0),
+                        EMPTY_INT_ARRAY,
+                        RescaleMappings.SYMMETRIC_IDENTITY,
                         Collections.emptySet(),
                         MappingType.IDENTITY) {
 
@@ -124,14 +126,12 @@ public class InflightDataRescalingDescriptor implements 
Serializable {
 
                     @Override
                     public int[] getOldSubtaskInstances() {
-                        throw new UnsupportedOperationException(
-                                "Cannot get old subtasks from a descriptor 
that represents no state.");
+                        return EMPTY_INT_ARRAY;
                     }
 
                     @Override
                     public RescaleMappings getRescaleMappings() {
-                        throw new UnsupportedOperationException(
-                                "Cannot get rescale mappings from a descriptor 
that represents no state.");
+                        return RescaleMappings.SYMMETRIC_IDENTITY;
                     }
                 };
 
@@ -228,7 +228,7 @@ public class InflightDataRescalingDescriptor implements 
Serializable {
 
         @Override
         public int[] getOldSubtaskIndexes(int gateOrPartitionIndex) {
-            return new int[0];
+            return EMPTY_INT_ARRAY;
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptorTest.java
index 6252d5b2106..f4123a34dec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptorTest.java
@@ -27,31 +27,25 @@ import java.util.Arrays;
 import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link InflightDataRescalingDescriptor}. */
 class InflightDataRescalingDescriptorTest {
 
     @Test
-    void testNoStateDescriptorThrowsOnGetOldSubtaskInstances() {
+    void testNoStateDescriptorReturnsEmptyOldSubtaskInstances() {
         InflightDataGateOrPartitionRescalingDescriptor noStateDescriptor =
                 InflightDataGateOrPartitionRescalingDescriptor.NO_STATE;
 
-        assertThatThrownBy(noStateDescriptor::getOldSubtaskInstances)
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Cannot get old subtasks from a descriptor that 
represents no state");
+        assertThat(noStateDescriptor.getOldSubtaskInstances()).isEqualTo(new 
int[0]);
     }
 
     @Test
-    void testNoStateDescriptorThrowsOnGetRescaleMappings() {
+    void testNoStateDescriptorReturnsSymmetricIdentity() {
         InflightDataGateOrPartitionRescalingDescriptor noStateDescriptor =
                 InflightDataGateOrPartitionRescalingDescriptor.NO_STATE;
 
-        assertThatThrownBy(noStateDescriptor::getRescaleMappings)
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Cannot get rescale mappings from a descriptor that 
represents no state");
+        assertThat(noStateDescriptor.getRescaleMappings())
+                .isEqualTo(RescaleMappings.SYMMETRIC_IDENTITY);
     }
 
     @Test
@@ -108,11 +102,10 @@ class InflightDataRescalingDescriptorTest {
         InflightDataRescalingDescriptor rescalingDescriptor =
                 new InflightDataRescalingDescriptor(descriptors);
 
-        // First gate/partition has NO_STATE
-        assertThatThrownBy(() -> rescalingDescriptor.getOldSubtaskIndexes(0))
-                .isInstanceOf(UnsupportedOperationException.class);
-        assertThatThrownBy(() -> rescalingDescriptor.getChannelMapping(0))
-                .isInstanceOf(UnsupportedOperationException.class);
+        // First gate/partition has NO_STATE - should return empty array and 
SYMMETRIC_IDENTITY
+        assertThat(rescalingDescriptor.getOldSubtaskIndexes(0)).isEqualTo(new 
int[0]);
+        assertThat(rescalingDescriptor.getChannelMapping(0))
+                .isEqualTo(RescaleMappings.SYMMETRIC_IDENTITY);
 
         // Second gate/partition has normal state
         assertThat(rescalingDescriptor.getOldSubtaskIndexes(1)).isEqualTo(new 
int[] {0, 1});
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
index 34c3277c4a1..d11f787aef6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.connector.datagen.source.DataGeneratorSource;
@@ -57,6 +58,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+import static 
org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY;
+
 /**
  * Integration test for rescaling jobs with mixed (UC-supported and 
UC-unsupported) exchanges from
  * an unaligned checkpoint.
@@ -80,7 +83,8 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
                 
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiOutputDAG,
                 
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiInputDAG,
                 
UnalignedCheckpointRescaleWithMixedExchangesITCase::createRescalePartitionerDAG,
-                
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMixedComplexityDAG);
+                
UnalignedCheckpointRescaleWithMixedExchangesITCase::createMixedComplexityDAG,
+                
UnalignedCheckpointRescaleWithMixedExchangesITCase::createPartEmptyHashExchangeDAG);
     }
 
     @Before
@@ -137,6 +141,7 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
         conf.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(1));
         // Disable aligned timeout to ensure it works with unaligned 
checkpoint directly
         conf.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, 
Duration.ofSeconds(0));
+        conf.set(RestartStrategyOptions.RESTART_STRATEGY, 
NO_RESTART_STRATEGY.getMainValue());
         conf.set(
                 CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
                 ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
@@ -336,6 +341,53 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
         return env.executeAsync();
     }
 
+    /**
+     * Creates a DAG where the downstream MapAfterKeyBy task receives input 
from two hash exchanges:
+     * one with actual data and one that is empty due to filtering. This tests 
unaligned checkpoint
+     * rescaling with mixed empty and non-empty hash partitions.
+     */
+    private static JobClient 
createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env)
+            throws Exception {
+        int source1Parallelism = getRandomParallelism();
+        DataGeneratorSource<Long> source1 =
+                new DataGeneratorSource<>(
+                        index -> index,
+                        Long.MAX_VALUE,
+                        RateLimiterStrategy.perSecond(5000),
+                        Types.LONG);
+        DataStream<Long> sourceStream1 =
+                env.fromSource(source1, WatermarkStrategy.noWatermarks(), 
"Source 1")
+                        .setParallelism(source1Parallelism);
+
+        int source2Parallelism = getRandomParallelism();
+        DataGeneratorSource<Long> source2 =
+                new DataGeneratorSource<>(
+                        index -> index,
+                        Long.MAX_VALUE,
+                        RateLimiterStrategy.perSecond(5000),
+                        Types.LONG);
+
+        // Filter all records to simulate empty state exchange
+        DataStream<Long> sourceStream2 =
+                env.fromSource(source2, WatermarkStrategy.noWatermarks(), 
"Source 2")
+                        .setParallelism(source2Parallelism)
+                        .filter(value -> false)
+                        .setParallelism(source2Parallelism);
+
+        sourceStream1
+                .union(sourceStream2)
+                .keyBy((KeySelector<Long, Long>) value -> value)
+                .map(
+                        x -> {
+                            Thread.sleep(5);
+                            return x;
+                        })
+                .name("MapAfterKeyBy")
+                .setParallelism(getRandomParallelism());
+
+        return env.executeAsync();
+    }
+
     private static int getRandomParallelism() {
         return RANDOM.nextInt(MAX_SLOTS) + 1;
     }

Reply via email to