AHeise commented on a change in pull request #13735:
URL: https://github.com/apache/flink/pull/13735#discussion_r517152644
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
##########
@@ -327,4 +289,252 @@ public boolean hasState() {
|| inputChannelState.hasState()
|| resultSubpartitionState.hasState();
}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * The builder for a new {@link OperatorSubtaskState} which can be
obtained by {@link #builder()}.
+ */
+ public static class Builder {
+ private StateObjectCollection<OperatorStateHandle>
managedOperatorState = StateObjectCollection.empty();
+ private StateObjectCollection<OperatorStateHandle>
rawOperatorState = StateObjectCollection.empty();
+ private StateObjectCollection<KeyedStateHandle>
managedKeyedState = StateObjectCollection.empty();
+ private StateObjectCollection<KeyedStateHandle> rawKeyedState =
StateObjectCollection.empty();
+ private StateObjectCollection<InputChannelStateHandle>
inputChannelState = StateObjectCollection.empty();
+ private StateObjectCollection<ResultSubpartitionStateHandle>
resultSubpartitionState = StateObjectCollection.empty();
+ private VirtualChannelMapping inputChannelMappings =
VirtualChannelMapping.NO_MAPPING;
+ private VirtualChannelMapping outputChannelMappings =
VirtualChannelMapping.NO_MAPPING;
+
+ private Builder() {
+ }
+
+ public Builder
setManagedOperatorState(StateObjectCollection<OperatorStateHandle>
managedOperatorState) {
+ this.managedOperatorState =
checkNotNull(managedOperatorState);
+ return this;
+ }
+
+ public Builder setManagedOperatorState(OperatorStateHandle
managedOperatorState) {
+ return
setManagedOperatorState(StateObjectCollection.singleton(checkNotNull(managedOperatorState)));
+ }
+
+ public Builder
setRawOperatorState(StateObjectCollection<OperatorStateHandle>
rawOperatorState) {
+ this.rawOperatorState = checkNotNull(rawOperatorState);
+ return this;
+ }
+
+ public Builder setRawOperatorState(OperatorStateHandle
rawOperatorState) {
+ return
setRawOperatorState(StateObjectCollection.singleton(checkNotNull(rawOperatorState)));
+ }
+
+ public Builder
setManagedKeyedState(StateObjectCollection<KeyedStateHandle> managedKeyedState)
{
+ this.managedKeyedState =
checkNotNull(managedKeyedState);
+ return this;
+ }
+
+ public Builder setManagedKeyedState(KeyedStateHandle
managedKeyedState) {
+ return
setManagedKeyedState(StateObjectCollection.singleton(checkNotNull(managedKeyedState)));
+ }
+
+ public Builder
setRawKeyedState(StateObjectCollection<KeyedStateHandle> rawKeyedState) {
+ this.rawKeyedState = checkNotNull(rawKeyedState);
+ return this;
+ }
+
+ public Builder setRawKeyedState(KeyedStateHandle rawKeyedState)
{
+ return
setRawKeyedState(StateObjectCollection.singleton(checkNotNull(rawKeyedState)));
+ }
+
+ public Builder
setInputChannelState(StateObjectCollection<InputChannelStateHandle>
inputChannelState) {
+ this.inputChannelState =
checkNotNull(inputChannelState);
+ return this;
+ }
+
+ public Builder
setResultSubpartitionState(StateObjectCollection<ResultSubpartitionStateHandle>
resultSubpartitionState) {
+ this.resultSubpartitionState =
checkNotNull(resultSubpartitionState);
+ return this;
+ }
+
+ public Builder setInputChannelMappings(VirtualChannelMapping
inputChannelMappings) {
+ this.inputChannelMappings =
checkNotNull(inputChannelMappings);
+ return this;
+ }
+
+ public Builder setOutputChannelMappings(VirtualChannelMapping
outputChannelMappings) {
+ this.outputChannelMappings =
checkNotNull(outputChannelMappings);
+ return this;
+ }
+
+ public OperatorSubtaskState build() {
+ return new OperatorSubtaskState(
+ managedOperatorState,
+ rawOperatorState,
+ managedKeyedState,
+ rawKeyedState,
+ inputChannelState,
+ resultSubpartitionState,
+ inputChannelMappings,
+ outputChannelMappings);
+ }
+ }
+
+ /**
+ * Captures ambiguous mappings of old channels to new channels.
+ *
+ * <p>For inputs, this mapping implies the following:
+ * <li>
+ * <ul>{@link #oldTaskInstances} is set when there is a rescale on
this task potentially leading to different
+ * key groups. Upstream task has a corresponding {@link
#partitionMappings} where it sends data over
+ * virtual channel while specifying the channel index in the
VirtualChannelSelector. This subtask then
+ * demultiplexes over the virtual subtask index.</ul>
+ * <ul>{@link #partitionMappings} is set when there is a downscale
of the upstream task. Upstream task has
+ * a corresponding {@link #oldTaskInstances} where it sends data
over virtual channel while specifying the
+ * subtask index in the VirtualChannelSelector. This subtask then
demultiplexes over channel indexes.</ul>
+ * </li>
+ *
+ * <p>For outputs, it's vice-versa. The information must be kept in
sync but they are used in opposite ways for
+ * multiplexing/demultiplexing.
+ *
+ * <p>Note that in the common rescaling case both information is set
and need to be simultaneously used. If the
+ * input subtask subsumes the state of 3 old subtasks and a channel
corresponds to 2 old channels, then there are
+ * 6 virtual channels to be demultiplexed.
+ */
+ public static class VirtualChannelMapping implements Serializable {
Review comment:
hm, it's all about channel state as part of the operator state. How
about `ChannelStateMapping`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]