This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b62db93 [FLINK-12203] Refactor ResultPartitionManager to break tie
with Task
b62db93 is described below
commit b62db93bf63cb3bb34dd03d611a779d9e3fc61ac
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Thu Apr 18 15:26:24 2019 +0200
[FLINK-12203] Refactor ResultPartitionManager to break tie with Task
At the moment, we have ResultPartitionManager.releasePartitionsProducedBy
which uses indexing by task in network environment. These methods are
eventually used only by Task which already knows its partitions so Task can use
ResultPartition.fail(cause) and TaskExecutor.failPartition could directly use
NetworkEnviroment.releasePartitions(Collection<ResultPartitionID>). This also
requires that JM Execution sends produced partition ids instead of just
ExecutionAttemptID.
This closes #8210.
---
.../flink/runtime/executiongraph/Execution.java | 20 ++++++---
.../runtime/io/network/NetworkEnvironment.java | 13 ++++++
.../io/network/partition/ResultPartition.java | 2 +-
.../network/partition/ResultPartitionManager.java | 50 ++++++----------------
.../jobmanager/slots/TaskManagerGateway.java | 8 ++--
.../runtime/jobmaster/RpcTaskManagerGateway.java | 6 ++-
.../flink/runtime/taskexecutor/TaskExecutor.java | 7 ++-
.../runtime/taskexecutor/TaskExecutorGateway.java | 8 ++--
.../utils/SimpleAckingTaskManagerGateway.java | 5 ++-
.../taskexecutor/TestingTaskExecutorGateway.java | 4 +-
10 files changed, 65 insertions(+), 58 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 63f3125..e413619 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -688,7 +689,7 @@ public class Execution implements AccessExecution,
Archiveable<ArchivedExecution
else if (current == FINISHED || current == FAILED) {
// nothing to do any more. finished failed
before it could be cancelled.
// in any case, the task is removed from the
TaskManager already
- sendFailIntermediateResultPartitionsRpcCall();
+
sendReleaseIntermediateResultPartitionsRpcCall();
return;
}
@@ -721,7 +722,7 @@ public class Execution implements AccessExecution,
Archiveable<ArchivedExecution
break;
case FINISHED:
case FAILED:
- sendFailIntermediateResultPartitionsRpcCall();
+
sendReleaseIntermediateResultPartitionsRpcCall();
break;
case CANCELED:
break;
@@ -1202,14 +1203,23 @@ public class Execution implements AccessExecution,
Archiveable<ArchivedExecution
}
}
- private void sendFailIntermediateResultPartitionsRpcCall() {
+ private void sendReleaseIntermediateResultPartitionsRpcCall() {
+ LOG.info("Discarding the results produced by task execution
{}.", attemptId);
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway =
slot.getTaskManagerGateway();
- // TODO For some tests this could be a problem when
querying too early if all resources were released
- taskManagerGateway.failPartition(attemptId);
+ Collection<IntermediateResultPartition> partitions =
vertex.getProducedPartitions().values();
+ Collection<ResultPartitionID> partitionIds = new
ArrayList<>(partitions.size());
+ for (IntermediateResultPartition partition :
partitions) {
+ partitionIds.add(new
ResultPartitionID(partition.getPartitionId(), attemptId));
+ }
+
+ if (!partitionIds.isEmpty()) {
+ // TODO For some tests this could be a problem
when querying too early if all resources were released
+
taskManagerGateway.releasePartitions(partitionIds);
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 98c61a4..0ee8595 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -26,6 +26,7 @@ import
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
@@ -38,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collection;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -207,6 +209,17 @@ public class NetworkEnvironment {
}
}
+ /**
+ * Batch release intermediate result partitions.
+ *
+ * @param partitionIds partition ids to release
+ */
+ public void releasePartitions(Collection<ResultPartitionID>
partitionIds) {
+ for (ResultPartitionID partitionId : partitionIds) {
+ resultPartitionManager.releasePartition(partitionId,
null);
+ }
+ }
+
public void start() throws IOException {
synchronized (lock) {
Preconditions.checkState(!isShutdown, "The
NetworkEnvironment has already been shut down.");
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index e0e7829..fb73a70 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -334,7 +334,7 @@ public class ResultPartition implements
ResultPartitionWriter, BufferPoolOwner {
}
public void fail(@Nullable Throwable throwable) {
-
partitionManager.releasePartitionsProducedBy(partitionId.getProducerId(),
throwable);
+ partitionManager.releasePartition(partitionId, throwable);
}
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 09a62ed..8d96e2b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -18,17 +18,11 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
-import
org.apache.flink.shaded.guava18.com.google.common.collect.HashBasedTable;
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Table;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkState;
@@ -41,19 +35,15 @@ public class ResultPartitionManager implements
ResultPartitionProvider {
private static final Logger LOG =
LoggerFactory.getLogger(ResultPartitionManager.class);
- public final Table<ExecutionAttemptID, IntermediateResultPartitionID,
ResultPartition>
- registeredPartitions = HashBasedTable.create();
+ private final Map<ResultPartitionID, ResultPartition>
registeredPartitions = new HashMap<>(16);
private boolean isShutdown;
- public void registerResultPartition(ResultPartition partition) throws
IOException {
+ public void registerResultPartition(ResultPartition partition) {
synchronized (registeredPartitions) {
checkState(!isShutdown, "Result partition manager
already shut down.");
- ResultPartitionID partitionId =
partition.getPartitionId();
-
- ResultPartition previous = registeredPartitions.put(
- partitionId.getProducerId(),
partitionId.getPartitionId(), partition);
+ ResultPartition previous =
registeredPartitions.put(partition.getPartitionId(), partition);
if (previous != null) {
throw new IllegalStateException("Result
partition already registered.");
@@ -70,8 +60,7 @@ public class ResultPartitionManager implements
ResultPartitionProvider {
BufferAvailabilityListener availabilityListener) throws
IOException {
synchronized (registeredPartitions) {
- final ResultPartition partition =
registeredPartitions.get(partitionId.getProducerId(),
- partitionId.getPartitionId());
+ final ResultPartition partition =
registeredPartitions.get(partitionId);
if (partition == null) {
throw new
PartitionNotFoundException(partitionId);
@@ -83,26 +72,14 @@ public class ResultPartitionManager implements
ResultPartitionProvider {
}
}
- public void releasePartitionsProducedBy(ExecutionAttemptID executionId)
{
- releasePartitionsProducedBy(executionId, null);
- }
-
- public void releasePartitionsProducedBy(ExecutionAttemptID executionId,
Throwable cause) {
+ public void releasePartition(ResultPartitionID partitionId, Throwable
cause) {
synchronized (registeredPartitions) {
- final Map<IntermediateResultPartitionID,
ResultPartition> partitions =
- registeredPartitions.row(executionId);
-
- for (ResultPartition partition : partitions.values()) {
- partition.release(cause);
+ ResultPartition resultPartition =
registeredPartitions.remove(partitionId);
+ if (resultPartition != null) {
+ resultPartition.release(cause);
+ LOG.debug("Released partition {} produced by
{}.",
+ partitionId.getPartitionId(),
partitionId.getProducerId());
}
-
- for (IntermediateResultPartitionID partitionId :
ImmutableList
- .copyOf(partitions.keySet())) {
-
- registeredPartitions.remove(executionId,
partitionId);
- }
-
- LOG.debug("Released all partitions produced by {}.",
executionId);
}
}
@@ -134,10 +111,7 @@ public class ResultPartitionManager implements
ResultPartitionProvider {
LOG.debug("Received consume notification from {}.", partition);
synchronized (registeredPartitions) {
- ResultPartitionID partitionId =
partition.getPartitionId();
-
- previous =
registeredPartitions.remove(partitionId.getProducerId(),
- partitionId.getPartitionId());
+ previous =
registeredPartitions.remove(partition.getPartitionId());
}
// Release the partition if it was successfully removed
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 6922b05..593a853 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -25,10 +25,12 @@ import
org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
@@ -98,11 +100,11 @@ public interface TaskManagerGateway {
Time timeout);
/**
- * Fail all intermediate result partitions of the given task.
+ * Batch release intermediate result partitions.
*
- * @param executionAttemptID identifying the task
+ * @param partitionIds partition ids to release
*/
- void failPartition(ExecutionAttemptID executionAttemptID);
+ void releasePartitions(Collection<ResultPartitionID> partitionIds);
/**
* Notify the given task about a completed checkpoint.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 064eef5..1fb2d49 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -25,12 +25,14 @@ import
org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
@@ -86,8 +88,8 @@ public class RpcTaskManagerGateway implements
TaskManagerGateway {
}
@Override
- public void failPartition(ExecutionAttemptID executionAttemptID) {
- taskExecutorGateway.failPartition(executionAttemptID);
+ public void releasePartitions(Collection<ResultPartitionID>
partitionIds) {
+ taskExecutorGateway.releasePartitions(partitionIds);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9b9ad5b..b35d65e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -649,11 +650,9 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
}
@Override
- public void failPartition(ExecutionAttemptID executionAttemptID) {
- log.info("Discarding the results produced by task execution
{}.", executionAttemptID);
-
+ public void releasePartitions(Collection<ResultPartitionID>
partitionIds) {
try {
-
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
+ networkEnvironment.releasePartitions(partitionIds);
} catch (Throwable t) {
// TODO: Do we still need this catch branch?
onFatalError(t);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 728087a..8a653df 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.types.SerializableOptional;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
@@ -99,11 +101,11 @@ public interface TaskExecutorGateway extends RpcGateway {
@RpcTimeout Time timeout);
/**
- * Fail all intermediate result partitions of the given task.
+ * Batch release intermediate result partitions.
*
- * @param executionAttemptID identifying the task
+ * @param partitionIds partition ids to release
*/
- void failPartition(ExecutionAttemptID executionAttemptID);
+ void releasePartitions(Collection<ResultPartitionID> partitionIds);
/**
* Trigger the checkpoint for the given task. The checkpoint is
identified by the checkpoint ID
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 22d5df0..dba0e7d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -26,10 +26,12 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
@@ -95,7 +97,8 @@ public class SimpleAckingTaskManagerGateway implements
TaskManagerGateway {
}
@Override
- public void failPartition(ExecutionAttemptID executionAttemptID) {}
+ public void releasePartitions(Collection<ResultPartitionID>
partitionIds) {
+ }
@Override
public void notifyCheckpointComplete(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 789956f..aca0bb2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
@@ -37,6 +38,7 @@ import
org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
@@ -123,7 +125,7 @@ public class TestingTaskExecutorGateway implements
TaskExecutorGateway {
}
@Override
- public void failPartition(ExecutionAttemptID executionAttemptID) {
+ public void releasePartitions(Collection<ResultPartitionID>
partitionIds) {
// noop
}