pnowojski commented on code in PR #24784: URL: https://github.com/apache/flink/pull/24784#discussion_r1604531639
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RescalingStreamTaskNetworkInput.java: ########## @@ -232,7 +232,9 @@ public Predicate<StreamRecord<T>> apply(InputChannelInfo channelInfo) { channelInfo.getGateIdx(), this::createPartitioner); // use a copy of partitioner to ensure that the filter of ambiguous virtual channels // have the same state across several subtasks - return new RecordFilter<>(partitioner.copy(), inputSerializer, subtaskIndex); + StreamPartitioner<T> partitionerCopy = partitioner.copy(); + partitionerCopy.setup(numberOfChannels); + return new RecordFilter<>(partitionerCopy, inputSerializer, subtaskIndex); Review Comment: Ok, in that case: is this a production bug (sounds like it), or only issue in testing? 1. If this is a production bug, please: - create a separate jira ticket for this issue - extract this change to a separate commit (can be part of this PR or a new PR, as you prefer) - add a test coverage? 2. If that's just testing issue: - extract this change to a separate `[hotfix]` commit in this PR ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment; +import static org.junit.Assert.fail; + +/** Integration test for performing rescale of unaligned checkpoint with custom partitioner. */ +public class UnalignedCheckpointCustomRescaleITCase { Review Comment: > But for the regular repeatability of the test it was necessary to use the String. There are plenty of solutions to this problem: 1. You are encoding in your `String` record three fields: `partition`, `index` and some `payload`. But I don't see you using anything but the `partition`, so you could convert your whole `String` record into `Long` record with value just for the `partition`. 2. You can encode any number of fields in single `Long` just as well. The easiest would be sth like that: ``` long encode(int partition, int index, int payload) { checkArgument(partition < 1024); checkArgument(index < 1024); checkArgument(payload < Long.MAX_VALUE / (1024*1024)); return partition + index * 1024 + payload * 1024 * 1024; } ``` 3. You could always most likely change the record type for `UnalignedCheckpointRescaleITCase`. `String` there should work just as fine, but that's probably more work vs option 1. > but I'm not sure. Also in UnalignedCheckpointRescaleITCase rescale full graph (change parallelism for all vertexes), but need to change only one vertex When creating job graph in `Topology` (`UnalignedCheckpointRescaleITCase.Topology#create`), you can set parallelism per vertex and AFAIK that will override the `env.setParallelism(...)` setting. So for the downstream vertex/task, that you don't want to change parallelism, you can call: ``` .addSink(new StringSink(createCheckpoint ? 100 : 1000)) .name("sink") .setParallelism(3); ``` While keep the upstream source's parallelism controlled by the environment. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ########## @@ -421,7 +423,27 @@ public void reDistributeInputChannelStates(TaskStateAssignment stateAssignment) stateAssignment.oldState.get(stateAssignment.inputOperatorID); final List<List<InputChannelStateHandle>> inputOperatorState = splitBySubtasks(inputState, OperatorSubtaskState::getInputChannelState); - if (inputState.getParallelism() == executionJobVertex.getParallelism()) { + + boolean noNeedRescale = + stateAssignment.executionJobVertex.getJobVertex().getInputs().stream() + .map(JobEdge::getDownstreamSubtaskStateMapper) + .anyMatch(m -> !m.equals(SubtaskStateMapper.FULL)) + && stateAssignment.executionJobVertex.getInputs().stream() + .map(IntermediateResult::getProducer) + .map(vertexAssignments::get) + .anyMatch( + taskStateAssignment -> { + final int oldParallelism = + stateAssignment + .oldState + .get(stateAssignment.inputOperatorID) + .getParallelism(); + return oldParallelism + == taskStateAssignment.executionJobVertex + .getParallelism(); + }); Review Comment: Maybe in `StateAssignmentOperationTest` create a unit test that has one `FULL` and one something else, and assert that the assigned states are as they should be? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org