rkhachatryan commented on code in PR #23510:
URL: https://github.com/apache/flink/pull/23510#discussion_r1356043975


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test savepoint rescaling. */

Review Comment:
   This test isn't actually using savepoints, is it?
   
   (ditto: other comments and some method names)



##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java:
##########
@@ -366,6 +367,32 @@ public static void waitForCheckpoint(JobID jobID, 
MiniCluster miniCluster, int n
                 });
     }
 
+    /** Wait for (at least) the given number of successful checkpoints. */

Review Comment:
   Please adjust the javadoc.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test savepoint rescaling. */
+@RunWith(Parameterized.class)
+public class AutoRescalingITCase extends TestLogger {
+
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorResource();
+
+    private static final int numTaskManagers = 2;
+    private static final int slotsPerTaskManager = 2;
+    private static final int numSlots = numTaskManagers * slotsPerTaskManager;
+
+    @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+                new Object[][] {
+                    {"rocksdb", 0}, {"rocksdb", 2}, {"filesystem", 0}, 
{"filesystem", 2}
+                });
+    }
+
+    public AutoRescalingITCase(String backend, int buffersPerChannel) {
+        this.backend = backend;
+        this.buffersPerChannel = buffersPerChannel;
+    }
+
+    private final String backend;
+
+    private final int buffersPerChannel;
+
+    private String currentBackend = null;
+
+    enum OperatorCheckpointMethod {
+        NON_PARTITIONED,
+        CHECKPOINTED_FUNCTION,
+        CHECKPOINTED_FUNCTION_BROADCAST,
+        LIST_CHECKPOINTED
+    }
+
+    private static MiniClusterWithClientResource cluster;
+    private static RestClusterClient<?> restClusterClient;
+
+    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @Before
+    public void setup() throws Exception {
+        // detect parameter change
+        if (!Objects.equals(currentBackend, backend)) {
+            shutDownExistingCluster();
+
+            currentBackend = backend;
+
+            Configuration config = new Configuration();
+
+            final File checkpointDir = temporaryFolder.newFolder();
+            final File savepointDir = temporaryFolder.newFolder();
+
+            config.setString(StateBackendOptions.STATE_BACKEND, 
currentBackend);
+            config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
true);
+            config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
+            config.setString(
+                    CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+            config.setString(
+                    CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
+            config.setInteger(
+                    
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel);
+
+            config.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+
+            // speed the test suite up
+            // - lower refresh interval -> controls how fast we invalidate 
ExecutionGraphCache
+            // - lower slot idle timeout -> controls how fast we return idle 
slots to TM
+            config.set(WebOptions.REFRESH_INTERVAL, 50L);
+            config.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, 50L);
+
+            cluster =
+                    new MiniClusterWithClientResource(
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setConfiguration(config)
+                                    .setNumberTaskManagers(numTaskManagers)
+                                    .setNumberSlotsPerTaskManager(numSlots)
+                                    .build());
+            cluster.before();
+            restClusterClient = cluster.getRestClusterClient();
+        }
+    }
+
+    @AfterClass
+    public static void shutDownExistingCluster() {
+        if (cluster != null) {
+            cluster.after();
+            cluster = null;
+        }
+    }
+
+    @Test
+    public void testCheckpointRescalingInKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(false);
+    }
+
+    @Test
+    public void testCheckpointRescalingOutKeyedState() throws Exception {
+        testCheckpointRescalingKeyedState(true);
+    }
+
+    /**
+     * Tests that a job with purely keyed state can be restarted from a 
savepoint with a different
+     * parallelism.
+     */
+    public void testCheckpointRescalingKeyedState(boolean scaleOut) throws 
Exception {
+        final int numberKeys = 42;
+        final int numberElements = 1000;
+        final int parallelism = scaleOut ? numSlots / 2 : numSlots;
+        final int parallelism2 = scaleOut ? numSlots : numSlots / 2;
+        final int maxParallelism = 13;
+
+        Duration timeout = Duration.ofMinutes(3);
+        Deadline deadline = Deadline.now().plus(timeout);
+
+        ClusterClient<?> client = cluster.getClusterClient();
+
+        try {
+
+            JobGraph jobGraph =
+                    createJobGraphWithKeyedState(
+                            new Configuration(),
+                            parallelism,
+                            maxParallelism,
+                            numberKeys,
+                            numberElements);
+
+            final JobID jobID = jobGraph.getJobID();
+
+            client.submitJob(jobGraph).get();
+
+            // wait til the sources have emitted numberElements for each key 
and completed a
+            // checkpoint
+            assertTrue(
+                    SubtaskIndexFlatMapper.workCompletedLatch.await(
+                            deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS));
+
+            // verify the current state
+
+            Set<Tuple2<Integer, Integer>> actualResult = 
CollectionSink.getElementsSet();
+
+            Set<Tuple2<Integer, Integer>> expectedResult = new HashSet<>();
+
+            for (int key = 0; key < numberKeys; key++) {
+                int keyGroupIndex = 
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
+
+                expectedResult.add(
+                        Tuple2.of(
+                                
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                                        maxParallelism, parallelism, 
keyGroupIndex),
+                                numberElements * key));
+            }
+
+            assertEquals(expectedResult, actualResult);
+
+            // clear the CollectionSink set for the restarted job
+            CollectionSink.clearElementsSet();
+
+            waitForAllTaskRunning(cluster.getMiniCluster(), 
jobGraph.getJobID(), false);
+
+            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+
+            JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
+            for (JobVertex vertex : jobGraph.getVertices()) {
+                builder.setParallelismForJobVertex(vertex.getID(), 
parallelism2, parallelism2);
+            }
+
+            restClusterClient.updateJobResourceRequirements(jobID, 
builder.build()).join();

Review Comment:
   IIRC, this call doesn't wait for the requirements to be satisfied.
   Should we wait after this call, that the actual DoP is actually what we want?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test savepoint rescaling. */

Review Comment:
   And maybe explicitly mention `RescalingITCase` and the difference between 
the two?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to