[
https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679216#comment-16679216
]
ASF GitHub Bot commented on FLINK-9869:
---------------------------------------
TisonKun closed pull request #6345: [FLINK-9869][runtime] Send PartitionInfo in
batch to Improve perfornance
URL: https://github.com/apache/flink/pull/6345
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/_includes/generated/job_manager_configuration.html
b/docs/_includes/generated/job_manager_configuration.html
index 0458af24c06..83d8abb7d27 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -42,6 +42,11 @@
<td style="word-wrap: break-word;">6123</td>
<td>The config parameter defining the network port to connect to
for communication with the job manager. Like jobmanager.rpc.address, this value
is only interpreted in setups where a single JobManager with static
name/address and port exists (simple standalone setups, or container setups
with dynamic service name resolution). This config option is not used in many
high-availability setups, when a leader-election service (like ZooKeeper) is
used to elect and discover the JobManager leader from potentially multiple
standby JobManagers.</td>
</tr>
+ <tr>
+ <td><h5>jobmanager.update-partition-info.send-interval</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>The interval of send update-partition-info message.</td>
+ </tr>
<tr>
<td><h5>jobstore.cache-size</h5></td>
<td style="word-wrap: break-word;">52428800</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 1666f213d18..43091a256b2 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -154,6 +154,11 @@
.defaultValue(60L * 60L)
.withDescription("The time in seconds after which a completed
job expires and is purged from the job store.");
+ public static final ConfigOption<Long>
UPDATE_PARTITION_INFO_SEND_INTERVAL =
+ key("jobmanager.update-partition-info.send-interval")
+ .defaultValue(10L)
+ .withDescription("The interval of send update-partition-info
message.");
+
/**
* The timeout in milliseconds for requesting a slot from Slot Pool.
*/
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..4a157f9cb60 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -27,18 +27,13 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -69,6 +64,8 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
@@ -178,6 +175,10 @@
//
--------------------------------------------------------------------------------------------
+ private final Object updatePartitionLock = new Object();
+
+ private ScheduledFuture updatePartitionFuture;
+
/**
* Creates a new Execution attempt.
*
@@ -588,24 +589,27 @@ public void deploy() throws JobException {
final TaskManagerGateway taskManagerGateway =
slot.getTaskManagerGateway();
- final CompletableFuture<Acknowledge> submitResultFuture
= taskManagerGateway.submitTask(deployment, rpcTimeout);
-
- submitResultFuture.whenCompleteAsync(
- (ack, failure) -> {
- // only respond to the failure case
- if (failure != null) {
- if (failure instanceof
TimeoutException) {
- String taskname =
vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
-
- markFailed(new
Exception(
- "Cannot deploy
task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
- + ")
not responding after a rpcTimeout of " + rpcTimeout, failure));
- } else {
- markFailed(failure);
- }
- }
- },
- executor);
+ executor.execute(
+ () -> {
+ final CompletableFuture<Acknowledge>
submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);
+
+ submitResultFuture.whenCompleteAsync(
+ (ack, failure) -> {
+ // only respond to the
failure case
+ if (failure != null) {
+ if (failure
instanceof TimeoutException) {
+ String
taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
+
+
markFailed(new Exception(
+
"Cannot deploy task " + taskname + " - TaskManager (" +
getAssignedResourceLocation()
+
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
+ } else {
+
markFailed(failure);
+ }
+ }
+ },
+ executor);
+ });
}
catch (Throwable t) {
markFailed(t);
@@ -759,44 +763,11 @@ else if (numConsumers == 0) {
//
----------------------------------------------------------------
else {
if (consumerState == RUNNING) {
- final LogicalSlot consumerSlot =
consumer.getAssignedResource();
-
- if (consumerSlot == null) {
- // The consumer has been reset
concurrently
- continue;
- }
-
- final TaskManagerLocation
partitionTaskManagerLocation = partition.getProducer()
-
.getCurrentAssignedResource().getTaskManagerLocation();
- final ResourceID partitionTaskManager =
partitionTaskManagerLocation.getResourceID();
-
- final ResourceID consumerTaskManager =
consumerSlot.getTaskManagerLocation().getResourceID();
-
- final ResultPartitionID partitionId =
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
- final ResultPartitionLocation
partitionLocation;
-
- if
(consumerTaskManager.equals(partitionTaskManager)) {
- // Consuming task is deployed
to the same instance as the partition => local
- partitionLocation =
ResultPartitionLocation.createLocal();
- }
- else {
- // Different instances => remote
- final ConnectionID connectionId
= new ConnectionID(
-
partitionTaskManagerLocation,
-
partition.getIntermediateResult().getConnectionIndex());
-
- partitionLocation =
ResultPartitionLocation.createRemote(connectionId);
- }
-
- final InputChannelDeploymentDescriptor
descriptor = new InputChannelDeploymentDescriptor(
- partitionId,
partitionLocation);
-
- consumer.sendUpdatePartitionInfoRpcCall(
- Collections.singleton(
- new PartitionInfo(
-
partition.getIntermediateResult().getId(),
- descriptor)));
+ // cache the partition info and trigger
a timer to group them and send in batch
+ final Execution partitionExecution =
partition.getProducer()
+ .getCurrentExecutionAttempt();
+
consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition,
partitionExecution));
+
consumerVertex.getCurrentExecutionAttempt().sendPartitionInfoAsync();
}
//
----------------------------------------------------------------
// Consumer is scheduled or deploying => cache
input channel
@@ -1031,6 +1002,10 @@ void
cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
}
void sendPartitionInfos() {
+ synchronized (updatePartitionLock) {
+ updatePartitionFuture = null;
+ }
+
// check if the ExecutionVertex has already been archived and
thus cleared the
// partial partition infos queue
if (partialInputChannelDeploymentDescriptors != null &&
!partialInputChannelDeploymentDescriptors.isEmpty()) {
@@ -1050,6 +1025,17 @@ void sendPartitionInfos() {
}
}
+ void sendPartitionInfoAsync() {
+ synchronized (updatePartitionLock) {
+ if (updatePartitionFuture == null) {
+ updatePartitionFuture =
getVertex().getExecutionGraph().getFutureExecutorService().schedule(
+ () -> {
+ sendPartitionInfos();
+ },
vertex.getExecutionGraph().getUpdatePartitionInfoSendInterval(),
TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
// Internal Actions
//
--------------------------------------------------------------------------------------------
@@ -1218,16 +1204,21 @@ private void sendUpdatePartitionInfoRpcCall(
final TaskManagerGateway taskManagerGateway =
slot.getTaskManagerGateway();
final TaskManagerLocation taskManagerLocation =
slot.getTaskManagerLocation();
- CompletableFuture<Acknowledge>
updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId,
partitionInfos, rpcTimeout);
-
- updatePartitionsResultFuture.whenCompleteAsync(
- (ack, failure) -> {
- // fail if there was a failure
- if (failure != null) {
- fail(new
IllegalStateException("Update task on TaskManager " + taskManagerLocation +
- " failed due to:",
failure));
- }
- }, executor);
+ executor.execute(
+ () -> {
+ CompletableFuture<Acknowledge>
updatePartitionsResultFuture =
+
taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);
+
+
updatePartitionsResultFuture.whenCompleteAsync(
+ (ack, failure) -> {
+ // fail if there was a
failure
+ if (failure != null) {
+ fail(new
IllegalStateException("Update task on TaskManager " + taskManagerLocation +
+ "
failed due to:", failure));
+ }
+ }, executor);
+ }
+ );
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..9687a640be1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -290,6 +290,8 @@
* available after archiving. */
private CheckpointStatsTracker checkpointStatsTracker;
+ private long updatePartitionInfoSendInterval;
+
// ------ Fields that are only relevant for archived execution graphs
------------
private String jsonPlan;
@@ -746,6 +748,15 @@ public Executor getFutureExecutor() {
return futureExecutor;
}
+ /**
+ * Returns the ExecutionContext associated with this ExecutionGraph.
+ *
+ * @return ExecutionContext associated with this ExecutionGraph
+ */
+ public ScheduledExecutorService getFutureExecutorService() {
+ return futureExecutor;
+ }
+
/**
* Merges all accumulator results from the tasks previously executed in
the Executions.
* @return The accumulator map
@@ -804,6 +815,15 @@ public Executor getFutureExecutor() {
return
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
}
+ public long getUpdatePartitionInfoSendInterval() {
+ return updatePartitionInfoSendInterval;
+ }
+
+ public void setUpdatePartitionInfoSendInterval(long
updatePartitionInfoSendInterval) {
+ this.updatePartitionInfoSendInterval =
updatePartitionInfoSendInterval;
+ }
+
+
//
--------------------------------------------------------------------------------------------
// Actions
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..0cd32ed6294 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -24,6 +24,7 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
@@ -367,6 +368,9 @@ public static ExecutionGraph buildGraph(
executionGraph.getFailoverStrategy().registerMetrics(metrics);
+ executionGraph.setUpdatePartitionInfoSendInterval(
+
jobManagerConfig.getLong(JobManagerOptions.UPDATE_PARTITION_INFO_SEND_INTERVAL));
+
return executionGraph;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12b4277941f..649c39d5711 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -159,7 +159,7 @@ public void testBuildDeploymentDescriptor() {
ExecutionGraph eg = new ExecutionGraph(
expectedJobInformation,
- TestingUtils.defaultExecutor(),
+ new DirectScheduledExecutorService(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index d91380ed275..3e36d95409e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -317,6 +317,9 @@ public void testFailCallOvertakesDeploymentAnswer() {
vertex.deployToSlot(slot);
assertEquals(ExecutionState.DEPLOYING,
vertex.getExecutionState());
+ // execute the deploy rpc call
+ queue.triggerNextAction();
+
Exception testError = new Exception("test error");
vertex.fail(testError);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Send PartitionInfo in batch to Improve perfornance
> --------------------------------------------------
>
> Key: FLINK-9869
> URL: https://issues.apache.org/jira/browse/FLINK-9869
> Project: Flink
> Issue Type: Improvement
> Components: Local Runtime
> Affects Versions: 1.5.1
> Reporter: TisonKun
> Assignee: TisonKun
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.5.6
>
>
> ... current we send partition info as soon as one arrive. we could
> `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve
> performance.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)