JunRuiLee commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1115342084
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1060,14 +1111,97 @@ private void setVertexConfig(
vertexConfigs.put(vertexID, config);
}
+ private void setChainedOutputsConfig(
+ Integer vertexId, StreamConfig config, List<StreamEdge>
chainableOutputs) {
+ // iterate edges, find sideOutput edges create and save serializers
for each outputTag type
Review Comment:
VertexId is a redundant parameter
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1060,14 +1111,97 @@ private void setVertexConfig(
vertexConfigs.put(vertexID, config);
}
+ private void setChainedOutputsConfig(
+ Integer vertexId, StreamConfig config, List<StreamEdge>
chainableOutputs) {
+ // iterate edges, find sideOutput edges create and save serializers
for each outputTag type
+ for (StreamEdge edge : chainableOutputs) {
+ if (edge.getOutputTag() != null) {
+ config.setTypeSerializerSideOut(
+ edge.getOutputTag(),
+ edge.getOutputTag()
+ .getTypeInfo()
+
.createSerializer(streamGraph.getExecutionConfig()));
+ }
+ }
+ config.setChainedOutputs(chainableOutputs);
+ }
+
+ private void setOperatorNonChainedOutputsConfig(
+ Integer vertexId,
+ StreamConfig config,
+ List<StreamEdge> nonChainableOutputs,
+ Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {
+ // iterate edges, find sideOutput edges create and save serializers
for each outputTag type
+ for (StreamEdge edge : nonChainableOutputs) {
+ if (edge.getOutputTag() != null) {
+ config.setTypeSerializerSideOut(
+ edge.getOutputTag(),
+ edge.getOutputTag()
+ .getTypeInfo()
+
.createSerializer(streamGraph.getExecutionConfig()));
+ }
+ }
+
+ List<NonChainedOutput> deduplicatedOutputs =
+ mayReuseNonChainedOutputs(vertexId, nonChainableOutputs,
outputsConsumedByEdge);
+ config.setNumberOfOutputs(deduplicatedOutputs.size());
+ config.setOperatorNonChainedOutputs(deduplicatedOutputs);
+ }
+
+ private void setVertexNonChainedOutputsConfig(
+ Integer startNodeId,
+ StreamConfig config,
+ List<StreamEdge> transitiveOutEdges,
+ final Map<Integer, Map<StreamEdge, NonChainedOutput>>
opIntermediateOutputs) {
+
+ LinkedHashSet<NonChainedOutput> transitiveOutputs = new
LinkedHashSet<>();
+ for (StreamEdge edge : transitiveOutEdges) {
+ NonChainedOutput output =
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
+ transitiveOutputs.add(output);
+ connect(startNodeId, edge, output);
+ }
+
+ config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
+ }
+
+ private void setAllOperatorNonChainedOutputsConfigs(
+ final Map<Integer, Map<StreamEdge, NonChainedOutput>>
opIntermediateOutputs) {
+ // set non chainable output config
+ opNonChainableOutputsCache.forEach(
+ (vertexId, nonChainableOutputs) -> {
+ Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
+ opIntermediateOutputs.computeIfAbsent(
+ vertexId, ignored -> new HashMap<>());
+ setOperatorNonChainedOutputsConfig(
+ vertexId,
+ vertexConfigs.get(vertexId),
+ nonChainableOutputs,
+ outputsConsumedByEdge);
+ });
+ }
+
+ private void setAllVertexNonChainedOutputsConfigs(
+ final Map<Integer, Map<StreamEdge, NonChainedOutput>>
opIntermediateOutputs) {
+ jobVertices
+ .keySet()
+ .forEach(
+ startNodeId -> {
+ setVertexNonChainedOutputsConfig(
+ startNodeId,
+ vertexConfigs.get(startNodeId),
+
chainInfos.get(startNodeId).getTransitiveOutEdges(),
+ opIntermediateOutputs);
+ });
Review Comment:
{} can be removed.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroupComputeUtilTest.java:
##########
@@ -36,7 +36,9 @@
import static org.assertj.core.api.Assertions.assertThat;
-/** Unit tests for {@link ForwardGroupComputeUtil}. */
+/**
+ * Unit tests for {@link
org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil}.
+ */
class ForwardGroupComputeUtilTest {
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
Review Comment:
Can be removed because the only use is removed in FLINK-30942.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -928,14 +1011,8 @@ private StreamConfig createJobVertex(Integer
streamNodeId, OperatorChainInfo cha
return new StreamConfig(jobVertex.getConfiguration());
Review Comment:
Can we move the above logic of reset parallelism to
setVertexParallelismForDynamicGraphIfNecessary?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroupComputeUtilTest.java:
##########
@@ -176,58 +178,14 @@ void testOneInputSplitsIntoTwo() throws Exception {
checkGroupSize(groups, 1, 3);
}
- /**
- * Tests whether the parallelism of job vertices in forward group are
correctly set.
- *
- * <pre>
- *
- * (v1) -> (v2)
- *
- * (v3) -> (v4)
- *
- * </pre>
- */
- @Test
- void testComputeForwardGroupsAndSetVertexParallelismsIfNecessary() throws
Exception {
- JobVertex v1 = new JobVertex("v1");
- JobVertex v2 = new JobVertex("v2");
- JobVertex v3 = new JobVertex("v3");
- JobVertex v4 = new JobVertex("v4");
-
- v2.setParallelism(8);
-
- v2.connectNewDataSetAsInput(
- v1, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
- v4.connectNewDataSetAsInput(
- v3, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
-
- v1.getProducedDataSets().get(0).getConsumers().get(0).setForward(true);
- v3.getProducedDataSets().get(0).getConsumers().get(0).setForward(true);
-
- Set<ForwardGroup> groups =
- computeForwardGroupsAndSetVertexParallelismsIfNecessary(v1,
v2, v3, v4);
- checkGroupSize(groups, 2, 2, 2);
- assertThat(v1.getParallelism()).isEqualTo(8);
- assertThat(v2.getParallelism()).isEqualTo(8);
- assertThat(v3.getParallelism()).isEqualTo(-1);
- assertThat(v4.getParallelism()).isEqualTo(-1);
- }
-
- private static Set<ForwardGroup>
computeForwardGroupsAndSetVertexParallelismsIfNecessary(
- JobVertex... vertices) throws Exception {
+ private static Set<ForwardGroup> computeForwardGroups(JobVertex...
vertices) throws Exception {
Review Comment:
Don't need throws exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]