rkhachatryan commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r517966691
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
##########
@@ -55,6 +56,14 @@ public int
selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return partitioner.partition(key, numberOfChannels);
}
+ @Override
+ public ChannelStateRescaler getDownstreamChannelStateRescaler() {
+ // fully rely on filtering downstream
+ // note that custom partitioners are not officially supported -
the user has to force rescaling
+ // in that case, we assume that the custom partitioner is
deterministic
+ return ChannelStateRescaler.BROADCAST;
Review comment:
This will not distribute state to the newer tasks in case of up-scaling,
right?
I think it can cause problems if the user-supplied partitioner relies on the
new DOP.
OTH, if not supported officially, we can leave it as it.
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
##########
@@ -64,81 +61,64 @@
/**
* Snapshot from the {@link
org.apache.flink.runtime.state.OperatorStateBackend}.
*/
- @Nonnull
private final StateObjectCollection<OperatorStateHandle>
managedOperatorState;
/**
* Snapshot written using {@link
org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}.
*/
- @Nonnull
private final StateObjectCollection<OperatorStateHandle>
rawOperatorState;
/**
* Snapshot from {@link
org.apache.flink.runtime.state.KeyedStateBackend}.
*/
- @Nonnull
private final StateObjectCollection<KeyedStateHandle> managedKeyedState;
/**
* Snapshot written using {@link
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}.
*/
- @Nonnull
private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
- @Nonnull
private final StateObjectCollection<InputChannelStateHandle>
inputChannelState;
- @Nonnull
private final StateObjectCollection<ResultSubpartitionStateHandle>
resultSubpartitionState;
/**
- * The state size. This is also part of the deserialized state handle.
- * We store it here in order to not deserialize the state handle when
- * gathering stats.
+ * The subpartitions mappings per partition set when the output
operator for a partition was rescaled. The key is
+ * the partition id and the value contains all subtask indexes of the
output operator before rescaling.
*/
- private final long stateSize;
+ private final InflightDataRescalingDescriptor inputRescalingDescriptor;
Review comment:
I think this field needs to be (de)serialized by `MetadataSerializer` as
other fields are.
(ditto `outputRescalingDescriptor`)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ChannelRescalerRepartitioner.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.api.writer.ChannelStateRescaler;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A repartitioner that assigns the same channel state to multiple subtasks
according to some mapping.
+ *
+ * <p>The replicated data will then be filtered before processing the record.
+ *
+ * <p>Note that channel mappings are cached for the same parallelism changes.
+ */
+public class ChannelRescalerRepartitioner<T> implements
OperatorStateRepartitioner<T> {
+ private final ChannelStateRescaler channelStateRescaler;
+ private final Map<Tuple2<Integer, Integer>, Map<Integer, Set<Integer>>>
newToOldMappingCache = new HashMap<>(2);
Review comment:
nit:
After removing the higher level cache, this field can also be computed
eagerly think (as it will be used anyways if the is being object is
constructed).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RescaledChannelsMapping.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * Contains the fine-grain channel mappings that occur when a connected
operator has been rescaled.
+ */
+public class RescaledChannelsMapping implements Serializable {
+ public static final RescaledChannelsMapping NO_CHANNEL_MAPPING = new
RescaledChannelsMapping(emptyMap());
+
+ private static final long serialVersionUID = -8719670050630674631L;
+
+ /**
+ * For each new channel (=index), all old channels are set.
+ */
+ private final Map<Integer, Set<Integer>> newToOldChannelIndexes;
+
+ /**
+ * For each old channel (=index), all new channels are set. Lazily
calculated to keep {@link OperatorSubtaskState}
+ * small in terms of serialization cost.
+ */
+ private transient Map<Integer, Set<Integer>> oldToNewChannelIndexes;
Review comment:
nit:
This field makes this class non-thread-safe, which is not obvious for a
pre-computed mapping.
So I'd mark this class `@NotThreadSafe`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
##########
@@ -43,4 +44,9 @@ public int
selectChannel(SerializationDelegate<StreamRecord<T>> record) {
public String toString() {
return "FORWARD";
}
+
+ @Override
+ public ChannelStateRescaler getDownstreamChannelStateRescaler() {
+ return ChannelStateRescaler.FIRST_CHANNEL;
Review comment:
Is `FIRST_CHANNEL` here (i.e. during the assignment) the same as in
`selectChannel` (i.e. during sending the data)?
In other words, I think this will put all `InputChannel` state into a single
subtask, while it should go to the same subtasks as before the rescaling.
WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]