zhuzhurk commented on a change in pull request #18462:
URL: https://github.com/apache/flink/pull/18462#discussion_r794289920
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -253,11 +252,11 @@ private void
changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parall
BlockingResultInfo.createFromIntermediateResult(intermediateResult));
} else {
// not all inputs consumable, return null
Review comment:
the comment is outdated
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
##########
@@ -173,12 +175,17 @@ public SchedulerNG createInstance(
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
}
- private void checkIsAllBlockingGraph(final JobGraph jobGraph) {
+ private void checkAllExchangesBlocking(final JobGraph jobGraph) {
for (JobVertex jobVertex : jobGraph.getVertices()) {
for (IntermediateDataSet dataSet :
jobVertex.getProducedDataSets()) {
checkState(
dataSet.getResultType().isBlocking(),
- "Adaptive batch scheduler currently only supports
ALL_EDGES_BLOCKING jobs.");
+ String.format(
+ "At the moment, fine-grained resource
management requires batch workloads "
Review comment:
fine-grained resource management -> adaptive batch scheduler
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegion.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A forward region is a set of vertices connected via forward edges. */
Review comment:
maybe add more docs like "Parallelisms of all job vertices in the same
{@link ForwardRegion} must be the same."
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
+import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import
org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion.ForwardRegion;
+import
org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion.ForwardRegionComputeUtil;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This scheduler decides the parallelism of JobVertex according to the data
volume it consumes. A
+ * dynamically built up ExecutionGraph is used for this purpose.
+ */
+public class AdaptiveBatchScheduler extends DefaultScheduler implements
SchedulerOperations {
+
+ private final DefaultLogicalTopology logicalTopology;
+
+ private final VertexParallelismDecider vertexParallelismDecider;
+
+ private final Map<JobVertexID, ForwardRegion> forwardRegionsByJobVertexId;
+
+ AdaptiveBatchScheduler(
+ final Logger log,
+ final JobGraph jobGraph,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final Consumer<ComponentMainThreadExecutor> startUpAction,
+ final ScheduledExecutor delayExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointsCleaner checkpointsCleaner,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final SchedulingStrategyFactory schedulingStrategyFactory,
+ final FailoverStrategy.Factory failoverStrategyFactory,
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ final ExecutionVertexOperations executionVertexOperations,
+ final ExecutionVertexVersioner executionVertexVersioner,
+ final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+ long initializationTimestamp,
+ final ComponentMainThreadExecutor mainThreadExecutor,
+ final JobStatusListener jobStatusListener,
+ final ExecutionGraphFactory executionGraphFactory,
+ final ShuffleMaster<?> shuffleMaster,
+ final Time rpcTimeout,
+ final VertexParallelismDecider vertexParallelismDecider,
+ int defaultMaxParallelism)
+ throws Exception {
+
+ super(
+ log,
+ jobGraph,
+ ioExecutor,
+ jobMasterConfiguration,
+ startUpAction,
+ delayExecutor,
+ userCodeLoader,
+ checkpointsCleaner,
+ checkpointRecoveryFactory,
+ jobManagerJobMetricGroup,
+ schedulingStrategyFactory,
+ failoverStrategyFactory,
+ restartBackoffTimeStrategy,
+ executionVertexOperations,
+ executionVertexVersioner,
+ executionSlotAllocatorFactory,
+ initializationTimestamp,
+ mainThreadExecutor,
+ jobStatusListener,
+ executionGraphFactory,
+ shuffleMaster,
+ rpcTimeout,
+ computeVertexParallelismStoreForDynamicGraph(
+ jobGraph.getVertices(), defaultMaxParallelism));
+
+ this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
+
+ this.vertexParallelismDecider = vertexParallelismDecider;
+
+ this.forwardRegionsByJobVertexId =
+
ForwardRegionComputeUtil.computeForwardRegions(jobGraph.getVertices());
+ }
+
+ @Override
+ public void startSchedulingInternal() {
+ initializeVerticesIfPossible();
+
+ super.startSchedulingInternal();
+ }
+
+ @Override
+ protected void updateTaskExecutionStateInternal(
+ final ExecutionVertexID executionVertexId,
+ final TaskExecutionStateTransition taskExecutionState) {
+
+ initializeVerticesIfPossible();
+
+ super.updateTaskExecutionStateInternal(executionVertexId,
taskExecutionState);
+ }
+
+ private void initializeVerticesIfPossible() {
+ final List<ExecutionJobVertex> newlyInitializedJobVertices = new
ArrayList<>();
+ try {
+ final long createTimestamp = System.currentTimeMillis();
+ for (ExecutionJobVertex jobVertex :
getExecutionGraph().getVerticesTopologically()) {
+ maybeSetParallelism(jobVertex);
+ }
+ for (ExecutionJobVertex jobVertex :
getExecutionGraph().getVerticesTopologically()) {
+ if (canInitialize(jobVertex)) {
+ getExecutionGraph().initializeJobVertex(jobVertex,
createTimestamp);
+ newlyInitializedJobVertices.add(jobVertex);
+ }
+ }
+ } catch (JobException ex) {
+ log.error("Unexpected error occurred when initializing
ExecutionJobVertex", ex);
+ failJob(ex, System.currentTimeMillis());
+ }
+
+ if (newlyInitializedJobVertices.size() > 0) {
+ updateTopology(newlyInitializedJobVertices);
+ }
+ }
+
+ private void maybeSetParallelism(final ExecutionJobVertex jobVertex) {
+ if (jobVertex.isParallelismDecided()) {
+ return;
+ }
+
+ Optional<List<BlockingResultInfo>> consumedResultsInfo =
+ tryGetConsumedResultsInfo(jobVertex);
+ if (!consumedResultsInfo.isPresent()) {
+ return;
+ }
+
+ ForwardRegion forwardRegion =
forwardRegionsByJobVertexId.get(jobVertex.getJobVertexId());
+ int parallelism;
+ if (forwardRegion.isParallelismDecided()) {
+ parallelism = forwardRegion.getParallelism();
+ log.info(
+ "Parallelism of JobVertex: {} ({}) is decided to be {}
according to forward region's parallelism.",
+ jobVertex.getName(),
+ jobVertex.getJobVertexId(),
+ parallelism);
+
+ } else {
+ parallelism =
+
vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo.get());
+ forwardRegion.setParallelism(parallelism);
+
+ log.info(
+ "Parallelism of JobVertex: {} ({}) is decided to be {}.",
+ jobVertex.getName(),
+ jobVertex.getJobVertexId(),
+ parallelism);
+ }
+
+ changeJobVertexParallelism(jobVertex, parallelism);
+ }
+
+ private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int
parallelism) {
+ // update PlanJson, it's needed to enable REST APIs to return the
latest parallelism of job
Review comment:
`PlanJson,` -> `the JSON plan.`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegion.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A forward region is a set of vertices connected via forward edges. */
+public class ForwardRegion {
+
+ private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+
+ private final Set<JobVertexID> jobVertexIds = new HashSet<>();
+
+ public ForwardRegion(final Set<JobVertex> jobVertices) {
Review comment:
maybe receive `Set<ExecutionJobVertex>` as the param and use
`ExecutionJobVertex#isParallelismDecided()` in the logic below? It will be more
consistent with the scheduler's view.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
##########
@@ -173,12 +175,17 @@ public SchedulerNG createInstance(
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
}
- private void checkIsAllBlockingGraph(final JobGraph jobGraph) {
+ private void checkAllExchangesBlocking(final JobGraph jobGraph) {
for (JobVertex jobVertex : jobGraph.getVertices()) {
for (IntermediateDataSet dataSet :
jobVertex.getProducedDataSets()) {
checkState(
dataSet.getResultType().isBlocking(),
- "Adaptive batch scheduler currently only supports
ALL_EDGES_BLOCKING jobs.");
+ String.format(
+ "At the moment, fine-grained resource
management requires batch workloads "
+ + "to be executed with types of all
edges being BLOCKING. "
+ + "To do that, you need to configure
'%s' to '%s'.",
+ ExecutionOptions.BATCH_SHUFFLE_MODE,
Review comment:
ExecutionOptions.BATCH_SHUFFLE_MODE ->
ExecutionOptions.BATCH_SHUFFLE_MODE.key()
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/forwardregion/ForwardRegionComputeUtil.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch.forwardregion;
+
+import org.apache.flink.runtime.executiongraph.RegionComputeUtil;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Common utils for computing forward regions. */
+public class ForwardRegionComputeUtil {
+
+ public static Map<JobVertexID, ForwardRegion> computeForwardRegions(
+ final Iterable<JobVertex> topologicallySortedVertices) {
+
+ final Map<JobVertex, Set<JobVertex>> vertexToRegion = new
IdentityHashMap<>();
+
+ // iterate all the vertices which are topologically sorted
+ for (JobVertex vertex : topologicallySortedVertices) {
+ Set<JobVertex> currentRegion = new HashSet<>();
+ currentRegion.add(vertex);
+ vertexToRegion.put(vertex, currentRegion);
+
+ for (JobEdge input : getForwardInputs(vertex)) {
+ final JobVertex producerVertex =
input.getSource().getProducer();
+ final Set<JobVertex> producerRegion =
vertexToRegion.get(producerVertex);
+
+ if (producerRegion == null) {
+ throw new IllegalStateException(
+ "Producer task "
+ + producerVertex.getID()
+ + " forward region is null"
+ + " while calculating forward region for
the consumer task "
+ + vertex.getID()
+ + ". This should be a forward region
building bug.");
+ }
+
+ if (currentRegion != producerRegion) {
+ currentRegion =
+ RegionComputeUtil.mergeRegions(
+ currentRegion, producerRegion,
vertexToRegion);
+ }
+ }
+ }
+
+ Set<ForwardRegion> forwardRegions =
+ RegionComputeUtil.uniqueRegions(vertexToRegion).stream()
+ .map(ForwardRegion::new)
+ .collect(Collectors.toSet());
Review comment:
I would suggest to exclude regions which only has one vertex, which
means it is actually in no forward region. This allows use to avoid keeping
unnecessary information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]