Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2747#discussion_r86372831
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
---
@@ -309,6 +310,85 @@ public void initializeState(OperatorStateHandles
operatorStateHandles) throws Ex
initializeCalled = true;
}
+ /**
+ * Takes the different {@link OperatorStateHandles} created by calling
{@link #snapshot(long, long)}
+ * on different instances of {@link AbstractStreamOperatorTestHarness}
(each one representing one subtask)
+ * and repacks them into a single {@link OperatorStateHandles} so that
the parallelism of the test
+ * can change arbitrarily (i.e. be able to scale both up and down).
+ * <p/>
+ * After repacking the partial states, use {@link
#initializeState(OperatorStateHandles)} to initialize
+ * a new instance with the resulting state. Bare in mind that for
parallelism greater than one, you
+ * have to use the constructor {@link
#AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
+ *
+ * <p/>
+ * <b>NOTE: </b> each of the {@code handles} in the argument list is
assumed to be from a single task of a single
+ * operator (i.e. chain length of one).
+ *
+ * <p/>
+ * For an example of how to use it, have a look at
+ * {@link
AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
+ *
+ * @param handles the different states to be merged.
+ * @return the resulting state, or {@code null} if no partial states
are specified.
+ */
+ public static OperatorStateHandles
repackageState(OperatorStateHandles... handles) throws Exception {
+
+ if (handles.length < 1) {
+ return null;
+ } else if (handles.length == 1) {
+ return handles[0];
+ }
+
+ List<OperatorStateHandle> mergedManagedOperatorState = new
ArrayList<>(handles.length);
+ List<OperatorStateHandle> mergedRawOperatorState = new
ArrayList<>(handles.length);
+
+ List<KeyGroupsStateHandle> mergedManagedKeyedState = new
ArrayList<>(handles.length);
+ List<KeyGroupsStateHandle> mergedRawKeyedState = new
ArrayList<>(handles.length);
+
+ for (OperatorStateHandles handle: handles) {
+
+ // each one of the collections are expected to have
+ // one member as they run with parallelism of 1
+
+ Collection<OperatorStateHandle> managedOperatorState =
handle.getManagedOperatorState();
+ Collection<OperatorStateHandle> rawOperatorState =
handle.getRawOperatorState();
+ Collection<KeyGroupsStateHandle> managedKeyedState =
handle.getManagedKeyedState();
+ Collection<KeyGroupsStateHandle> rawKeyedState =
handle.getRawKeyedState();
+
+
+ if ((managedOperatorState != null &&
managedOperatorState.size() > 1) ||
--- End diff --
Is this restriction necessary? I think it would also work if there are
several entries in the list and we just add them all together.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---