pnowojski commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r512158143



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelRescaler.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ * The {@link ChannelRescaler} narrows down the channels that need to be read 
during rescaling to recover from a
+ * particular channel when in-flight data has been stored in the checkpoint.
+ */
+@Internal
+public interface ChannelRescaler extends Serializable {
+       /**
+        * Returns all old channel indexes that need to be read to restore all 
buffers for the given new channel index on
+        * rescale.
+        */
+       BitSet rescaleIntersections(int newChannelIndex, int 
oldNumberOfChannels, int newNumberOfChannels);

Review comment:
       What is the relation between this method/interface and 
`#rescaleIntersections` from the `Partitioner`? Why one returns ` int[]` and 
the other `BitSet`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
##########
@@ -63,31 +61,25 @@
        /**
         * Snapshot from the {@link 
org.apache.flink.runtime.state.OperatorStateBackend}.
         */
-       @Nonnull

Review comment:
       Removal of `@Nonnull` is controversial by some. Some people prefer it as 
cheap, debug mode only assertion. This is because as far as I know, `@Nonnull` 
in debug mode, is adding `checkNonNull(...)` equivalents.
   
   I personally would be fine by not using them, and I personally I'm not 
adding them in a new code, but because of the above reason, I would be actually 
against removing them from a code that someone added (otherwise, we can end up 
in a ping pong situation when you are removing them and someone else re-adding).

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
##########
@@ -37,4 +40,20 @@
         * @return The partition index.
         */
        int partition(K key, int numPartitions);
+
+       /**
+        * Returns all partitions that need to be read to restore the given new 
partition. The partitioner is then
+        * applied on the key of the restored record to filter all irrelevant 
records.
+        *
+        * <p>In particular, to create a partition X after rescaling, all 
partitions returned by this method are fully read
+        * and the key of each record is then fed into {@link 
#partition(Object, int)} to check if it belongs to X.
+        *
+        * <p>The default implementation states that all partitions need to be 
scanned and should be overwritten to improve
+        * performance.
+        */
+       @PublicEvolving
+       default int[] rescaleIntersections(int newPartition, int 
oldNumPartitions, int newNumPartitions) {
+               // any old partition may contain a record that should be in the 
new partition after rescaling
+               return IntStream.range(0, oldNumPartitions).toArray();
+       }

Review comment:
       We need this to support `CustomPartitionerWrapper`, right? This is 
`@Public` interface :( I think we need to think twice before we commit 
ourselves to such change. Let's sync offline again whether this is really the 
best/the only way of solving our problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to