Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2747#discussion_r86372831
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ---
    @@ -309,6 +310,85 @@ public void initializeState(OperatorStateHandles 
operatorStateHandles) throws Ex
                initializeCalled = true;
        }
     
    +   /**
    +    * Takes the different {@link OperatorStateHandles} created by calling 
{@link #snapshot(long, long)}
    +    * on different instances of {@link AbstractStreamOperatorTestHarness} 
(each one representing one subtask)
    +    * and repacks them into a single {@link OperatorStateHandles} so that 
the parallelism of the test
    +    * can change arbitrarily (i.e. be able to scale both up and down).
    +    * <p/>
    +    * After repacking the partial states, use {@link 
#initializeState(OperatorStateHandles)} to initialize
    +    * a new instance with the resulting state. Bare in mind that for 
parallelism greater than one, you
    +    * have to use the constructor {@link 
#AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
    +    *
    +    * <p/>
    +    * <b>NOTE: </b> each of the {@code handles} in the argument list is 
assumed to be from a single task of a single
    +    * operator (i.e. chain length of one).
    +    *
    +    * <p/>
    +    * For an example of how to use it, have a look at
    +    * {@link 
AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
    +    *
    +    * @param handles the different states to be merged.
    +    * @return the resulting state, or {@code null} if no partial states 
are specified.
    +    */
    +   public static OperatorStateHandles 
repackageState(OperatorStateHandles... handles) throws Exception {
    +
    +           if (handles.length < 1) {
    +                   return null;
    +           } else if (handles.length == 1) {
    +                   return handles[0];
    +           }
    +
    +           List<OperatorStateHandle> mergedManagedOperatorState = new 
ArrayList<>(handles.length);
    +           List<OperatorStateHandle> mergedRawOperatorState = new 
ArrayList<>(handles.length);
    +
    +           List<KeyGroupsStateHandle> mergedManagedKeyedState = new 
ArrayList<>(handles.length);
    +           List<KeyGroupsStateHandle> mergedRawKeyedState = new 
ArrayList<>(handles.length);
    +
    +           for (OperatorStateHandles handle: handles) {
    +
    +                   // each one of the collections are expected to have
    +                   // one member as they run with parallelism of 1
    +
    +                   Collection<OperatorStateHandle> managedOperatorState = 
handle.getManagedOperatorState();
    +                   Collection<OperatorStateHandle> rawOperatorState = 
handle.getRawOperatorState();
    +                   Collection<KeyGroupsStateHandle> managedKeyedState = 
handle.getManagedKeyedState();
    +                   Collection<KeyGroupsStateHandle> rawKeyedState = 
handle.getRawKeyedState();
    +
    +
    +                   if ((managedOperatorState != null && 
managedOperatorState.size() > 1) ||
    --- End diff --
    
    Is this restriction necessary? I think it would also work if there are 
several entries in the list and we just add them all together.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to