Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1641#discussion_r53808877
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
---
@@ -533,4 +563,33 @@ private StreamGraph
generateInternal(List<StreamTransformation<?>> transformatio
return Collections.singleton(transform.getId());
}
+ /**
+ * Determines the slot sharing group for an operation based on the slot
sharing group set by
+ * the user and the slot sharing groups of the inputs.
+ *
+ * <p>If the user specifies a group name, this is taken as is. If
nothing is specified and
+ * the input operations all have the same group name then this name is
taken. Otherwise the
+ * default group is choosen.
+ *
+ * @param specifiedGroup The group specified by the user.
+ * @param inputIds The IDs of the input operations.
+ */
+ private String determineSlotSharingGroup(String specifiedGroup,
Collection<Integer> inputIds) {
+ if (specifiedGroup != null) {
+ return specifiedGroup;
+ } else {
+ String inputGroup = null;
+ for (int id: inputIds) {
+ String inputGroupCandidate =
streamGraph.getSlotSharingGroup(id);
+ if (inputGroup == null) {
+ inputGroup = inputGroupCandidate;
+ continue;
+ }
+ if (!inputGroup.equals(inputGroupCandidate)) {
--- End diff --
Can't we get rid of the `continue` statement by making this an `else
if(...)` branch?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---