pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1604531639


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java:
##########
@@ -232,7 +232,9 @@ public Predicate<StreamRecord<T>> apply(InputChannelInfo 
channelInfo) {
                             channelInfo.getGateIdx(), this::createPartitioner);
             // use a copy of partitioner to ensure that the filter of 
ambiguous virtual channels
             // have the same state across several subtasks
-            return new RecordFilter<>(partitioner.copy(), inputSerializer, 
subtaskIndex);
+            StreamPartitioner<T> partitionerCopy = partitioner.copy();
+            partitionerCopy.setup(numberOfChannels);
+            return new RecordFilter<>(partitionerCopy, inputSerializer, 
subtaskIndex);

Review Comment:
   Ok, in that case: is this a production bug (sounds like it), or only issue 
in testing?
   
   1. If this is a production bug, please:
     - create a separate jira ticket for this issue
     - extract this change to a separate commit (can be part of this PR or a 
new PR, as you prefer) 
     - add a test coverage?
   2. If that's just testing issue:
     - extract this change to a separate `[hotfix]` commit in this PR



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   >  But for the regular repeatability of the test it was necessary to use the 
String.
   
   There are plenty of solutions to this problem:
   
   1. You are encoding in your `String` record three fields: `partition`, 
`index` and some `payload`. But I don't see you using anything but the 
`partition`, so you could convert your whole `String` record into `Long` record 
with value just for the `partition`.
   2. You can encode any number of fields in single `Long` just as well. The 
easiest would be sth like that:
   ```
   long encode(int partition, int index, int payload) {
     checkArgument(partition < 1024);
     checkArgument(index < 1024);
     checkArgument(payload < Long.MAX_VALUE / (1024*1024));
     return partition + index * 1024 + payload * 1024 * 1024;
   }
   ``` 
   3. You could always most likely change the record type for 
`UnalignedCheckpointRescaleITCase`. `String` there should work just as fine, 
but that's probably more work vs option 1.
   
   > but I'm not sure. Also in UnalignedCheckpointRescaleITCase rescale full 
graph (change parallelism for all vertexes), but need to change only one vertex
   
   When creating job graph in `Topology`  
(`UnalignedCheckpointRescaleITCase.Topology#create`), you can set parallelism 
per vertex and AFAIK that will override the `env.setParallelism(...)` setting. 
So for the downstream vertex/task, that you don't want to change parallelism, 
you can call:
   ```
                       .addSink(new StringSink(createCheckpoint ? 100 : 1000))
                       .name("sink")
                       .setParallelism(3);
   ```
   While keep the upstream source's parallelism controlled by the environment.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
                 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
         final List<List<InputChannelStateHandle>> inputOperatorState =
                 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-        if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+        boolean noNeedRescale =
+                
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+                                .map(JobEdge::getDownstreamSubtaskStateMapper)
+                                .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+                        && 
stateAssignment.executionJobVertex.getInputs().stream()
+                                .map(IntermediateResult::getProducer)
+                                .map(vertexAssignments::get)
+                                .anyMatch(
+                                        taskStateAssignment -> {
+                                            final int oldParallelism =
+                                                    stateAssignment
+                                                            .oldState
+                                                            
.get(stateAssignment.inputOperatorID)
+                                                            .getParallelism();
+                                            return oldParallelism
+                                                    == 
taskStateAssignment.executionJobVertex
+                                                            .getParallelism();
+                                        });

Review Comment:
   Maybe in `StateAssignmentOperationTest` create a unit test that has one 
`FULL` and one something else, and assert that the assigned states are as they 
should be?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to