This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd2a88b71c07017b9633aaa109f1774c31036785 Author: Chesnay Schepler <[email protected]> AuthorDate: Wed Jun 12 13:40:03 2019 +0200 [FLINK-12615][coordination] Support generic key in PartitionTable --- .../flink/runtime/taskexecutor/TaskExecutor.java | 4 +-- .../runtime/taskexecutor/TaskManagerRunner.java | 2 +- .../taskexecutor/partition/PartitionTable.java | 39 +++++++++++----------- .../TaskExecutorPartitionLifecycleTest.java | 6 ++-- .../runtime/taskexecutor/TaskExecutorTest.java | 4 +-- .../TaskSubmissionTestEnvironment.java | 2 +- .../runtime/taskexecutor/TestingTaskExecutor.java | 3 +- .../taskexecutor/partition/PartitionTableTest.java | 8 ++--- 8 files changed, 34 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 7bf5984..8c99c0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -208,7 +208,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { /** The heartbeat manager for resource manager in the task manager. */ private HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager; - private final PartitionTable partitionTable; + private final PartitionTable<JobID> partitionTable; // --------- resource manager -------- @@ -238,7 +238,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler, - PartitionTable partitionTable) { + PartitionTable<JobID> partitionTable) { super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 23cb6bc..72cc2c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -386,7 +386,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync metricQueryServiceAddress, blobCacheService, fatalErrorHandler, - new PartitionTable()); + new PartitionTable<>()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java index 8548024..02942cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java @@ -17,7 +17,6 @@ package org.apache.flink.runtime.taskexecutor.partition; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.util.Preconditions; @@ -34,25 +33,25 @@ import java.util.concurrent.ConcurrentHashMap; * Thread-safe Utility for tracking partitions. */ @ThreadSafe -public class PartitionTable { +public class PartitionTable<K> { - private final Map<JobID, Set<ResultPartitionID>> trackedPartitionsPerJob = new ConcurrentHashMap<>(8); + private final Map<K, Set<ResultPartitionID>> trackedPartitionsPerJob = new ConcurrentHashMap<>(8); /** - * Returns whether any partitions are being tracked for the given job. + * Returns whether any partitions are being tracked for the given key. */ - public boolean hasTrackedPartitions(JobID jobId) { - return trackedPartitionsPerJob.containsKey(jobId); + public boolean hasTrackedPartitions(K key) { + return trackedPartitionsPerJob.containsKey(key); } /** - * Starts the tracking of the given partition for the given job. + * Starts the tracking of the given partition for the given key. */ - public void startTrackingPartitions(JobID jobId, Collection<ResultPartitionID> newPartitionIds) { - Preconditions.checkNotNull(jobId); + public void startTrackingPartitions(K key, Collection<ResultPartitionID> newPartitionIds) { + Preconditions.checkNotNull(key); Preconditions.checkNotNull(newPartitionIds); - trackedPartitionsPerJob.compute(jobId, (ignored, partitionIds) -> { + trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> { if (partitionIds == null) { partitionIds = new HashSet<>(8); } @@ -62,28 +61,28 @@ public class PartitionTable { } /** - * Stops the tracking of all partition for the given job. + * Stops the tracking of all partition for the given key. */ - public Collection<ResultPartitionID> stopTrackingPartitions(JobID jobId) { - Preconditions.checkNotNull(jobId); + public Collection<ResultPartitionID> stopTrackingPartitions(K key) { + Preconditions.checkNotNull(key); - Set<ResultPartitionID> storedPartitions = trackedPartitionsPerJob.remove(jobId); + Set<ResultPartitionID> storedPartitions = trackedPartitionsPerJob.remove(key); return storedPartitions == null ? Collections.emptyList() : storedPartitions; } /** - * Stops the tracking of the given set of partitions for the given job. + * Stops the tracking of the given set of partitions for the given key. */ - public void stopTrackingPartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) { - Preconditions.checkNotNull(jobId); + public void stopTrackingPartitions(K key, Collection<ResultPartitionID> partitionIds) { + Preconditions.checkNotNull(key); Preconditions.checkNotNull(partitionIds); - // If the JobID is unknown we do not fail here, in line with ShuffleEnvironment#releaseFinishedPartitions + // If the key is unknown we do not fail here, in line with ShuffleEnvironment#releaseFinishedPartitions trackedPartitionsPerJob.computeIfPresent( - jobId, - (key, resultPartitionIDS) -> { + key, + (ignored, resultPartitionIDS) -> { resultPartitionIDS.removeAll(partitionIds); return resultPartitionIDS.isEmpty() ? null diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index a616c71..490cbb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -148,7 +148,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { .setTaskSlotTable(createTaskSlotTable()) .build(); - final PartitionTable partitionTable = new PartitionTable(); + final PartitionTable<JobID> partitionTable = new PartitionTable<>(); final ResultPartitionID resultPartitionId = new ResultPartitionID(); final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTable); @@ -267,7 +267,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { }) .build(); - final PartitionTable partitionTable = new PartitionTable(); + final PartitionTable<JobID> partitionTable = new PartitionTable<>(); final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTable); @@ -385,7 +385,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { } } - private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, PartitionTable partitionTable) throws IOException { + private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, PartitionTable<JobID> partitionTable) throws IOException { return new TestingTaskExecutor( RPC, TaskManagerConfiguration.fromConfiguration(new Configuration()), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 33113f8..e67e979 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -1868,7 +1868,7 @@ public class TaskExecutorTest extends TestLogger { null, dummyBlobCacheService, testingFatalErrorHandler, - new PartitionTable()); + new PartitionTable<>()); } private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) { @@ -1886,7 +1886,7 @@ public class TaskExecutorTest extends TestLogger { null, dummyBlobCacheService, testingFatalErrorHandler, - new PartitionTable()); + new PartitionTable<>()); } private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index a9b6f92..47b2dba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -207,7 +207,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { null, blobCacheService, testingFatalErrorHandler, - new PartitionTable() + new PartitionTable<>() ); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java index 74d2e08..1aa708b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -46,7 +47,7 @@ class TestingTaskExecutor extends TaskExecutor { @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler, - PartitionTable partitionTable) { + PartitionTable<JobID> partitionTable) { super( rpcService, taskManagerConfiguration, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java index c00517e..4e54af1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java @@ -43,7 +43,7 @@ public class PartitionTableTest extends TestLogger { @Test public void testEmptyTable() { - final PartitionTable table = new PartitionTable(); + final PartitionTable<JobID> table = new PartitionTable<>(); // an empty table should always return an empty collection Collection<ResultPartitionID> partitionsForNonExistingJob = table.stopTrackingPartitions(JOB_ID); @@ -55,7 +55,7 @@ public class PartitionTableTest extends TestLogger { @Test public void testStartTrackingPartition() { - final PartitionTable table = new PartitionTable(); + final PartitionTable<JobID> table = new PartitionTable<>(); table.startTrackingPartitions(JOB_ID, Collections.singletonList(PARTITION_ID)); @@ -64,7 +64,7 @@ public class PartitionTableTest extends TestLogger { @Test public void testStopTrackingAllPartitions() { - final PartitionTable table = new PartitionTable(); + final PartitionTable<JobID> table = new PartitionTable<>(); table.startTrackingPartitions(JOB_ID, Collections.singletonList(PARTITION_ID)); @@ -76,7 +76,7 @@ public class PartitionTableTest extends TestLogger { @Test public void testStopTrackingPartitions() { final ResultPartitionID partitionId2 = new ResultPartitionID(); - final PartitionTable table = new PartitionTable(); + final PartitionTable<JobID> table = new PartitionTable<>(); table.startTrackingPartitions(JOB_ID, Collections.singletonList(PARTITION_ID)); table.startTrackingPartitions(JOB_ID, Collections.singletonList(partitionId2));
