This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd2a88b71c07017b9633aaa109f1774c31036785
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 12 13:40:03 2019 +0200

    [FLINK-12615][coordination] Support generic key in PartitionTable
---
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +--
 .../runtime/taskexecutor/TaskManagerRunner.java    |  2 +-
 .../taskexecutor/partition/PartitionTable.java     | 39 +++++++++++-----------
 .../TaskExecutorPartitionLifecycleTest.java        |  6 ++--
 .../runtime/taskexecutor/TaskExecutorTest.java     |  4 +--
 .../TaskSubmissionTestEnvironment.java             |  2 +-
 .../runtime/taskexecutor/TestingTaskExecutor.java  |  3 +-
 .../taskexecutor/partition/PartitionTableTest.java |  8 ++---
 8 files changed, 34 insertions(+), 34 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7bf5984..8c99c0f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -208,7 +208,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        /** The heartbeat manager for resource manager in the task manager. */
        private HeartbeatManager<Void, SlotReport> 
resourceManagerHeartbeatManager;
 
-       private final PartitionTable partitionTable;
+       private final PartitionTable<JobID> partitionTable;
 
        // --------- resource manager --------
 
@@ -238,7 +238,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        @Nullable String metricQueryServiceAddress,
                        BlobCacheService blobCacheService,
                        FatalErrorHandler fatalErrorHandler,
-                       PartitionTable partitionTable) {
+                       PartitionTable<JobID> partitionTable) {
 
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 23cb6bc..72cc2c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -386,7 +386,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        metricQueryServiceAddress,
                        blobCacheService,
                        fatalErrorHandler,
-                       new PartitionTable());
+                       new PartitionTable<>());
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
index 8548024..02942cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.runtime.taskexecutor.partition;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.util.Preconditions;
 
@@ -34,25 +33,25 @@ import java.util.concurrent.ConcurrentHashMap;
  * Thread-safe Utility for tracking partitions.
  */
 @ThreadSafe
-public class PartitionTable {
+public class PartitionTable<K> {
 
-       private final Map<JobID, Set<ResultPartitionID>> 
trackedPartitionsPerJob = new ConcurrentHashMap<>(8);
+       private final Map<K, Set<ResultPartitionID>> trackedPartitionsPerJob = 
new ConcurrentHashMap<>(8);
 
        /**
-        * Returns whether any partitions are being tracked for the given job.
+        * Returns whether any partitions are being tracked for the given key.
         */
-       public boolean hasTrackedPartitions(JobID jobId) {
-               return trackedPartitionsPerJob.containsKey(jobId);
+       public boolean hasTrackedPartitions(K key) {
+               return trackedPartitionsPerJob.containsKey(key);
        }
 
        /**
-        * Starts the tracking of the given partition for the given job.
+        * Starts the tracking of the given partition for the given key.
         */
-       public void startTrackingPartitions(JobID jobId, 
Collection<ResultPartitionID> newPartitionIds) {
-               Preconditions.checkNotNull(jobId);
+       public void startTrackingPartitions(K key, 
Collection<ResultPartitionID> newPartitionIds) {
+               Preconditions.checkNotNull(key);
                Preconditions.checkNotNull(newPartitionIds);
 
-               trackedPartitionsPerJob.compute(jobId, (ignored, partitionIds) 
-> {
+               trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> 
{
                        if (partitionIds == null) {
                                partitionIds = new HashSet<>(8);
                        }
@@ -62,28 +61,28 @@ public class PartitionTable {
        }
 
        /**
-        * Stops the tracking of all partition for the given job.
+        * Stops the tracking of all partition for the given key.
         */
-       public Collection<ResultPartitionID> stopTrackingPartitions(JobID 
jobId) {
-               Preconditions.checkNotNull(jobId);
+       public Collection<ResultPartitionID> stopTrackingPartitions(K key) {
+               Preconditions.checkNotNull(key);
 
-               Set<ResultPartitionID> storedPartitions = 
trackedPartitionsPerJob.remove(jobId);
+               Set<ResultPartitionID> storedPartitions = 
trackedPartitionsPerJob.remove(key);
                return storedPartitions == null
                        ? Collections.emptyList()
                        : storedPartitions;
        }
 
        /**
-        * Stops the tracking of the given set of partitions for the given job.
+        * Stops the tracking of the given set of partitions for the given key.
         */
-       public void stopTrackingPartitions(JobID jobId, 
Collection<ResultPartitionID> partitionIds) {
-               Preconditions.checkNotNull(jobId);
+       public void stopTrackingPartitions(K key, Collection<ResultPartitionID> 
partitionIds) {
+               Preconditions.checkNotNull(key);
                Preconditions.checkNotNull(partitionIds);
 
-               // If the JobID is unknown we do not fail here, in line with 
ShuffleEnvironment#releaseFinishedPartitions
+               // If the key is unknown we do not fail here, in line with 
ShuffleEnvironment#releaseFinishedPartitions
                trackedPartitionsPerJob.computeIfPresent(
-                       jobId,
-                       (key, resultPartitionIDS) -> {
+                       key,
+                       (ignored, resultPartitionIDS) -> {
                                resultPartitionIDS.removeAll(partitionIds);
                                return resultPartitionIDS.isEmpty()
                                        ? null
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index a616c71..490cbb0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -148,7 +148,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        .setTaskSlotTable(createTaskSlotTable())
                        .build();
 
-               final PartitionTable partitionTable = new PartitionTable();
+               final PartitionTable<JobID> partitionTable = new 
PartitionTable<>();
                final ResultPartitionID resultPartitionId = new 
ResultPartitionID();
 
                final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices, partitionTable);
@@ -267,7 +267,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        })
                        .build();
 
-               final PartitionTable partitionTable = new PartitionTable();
+               final PartitionTable<JobID> partitionTable = new 
PartitionTable<>();
 
                final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices, partitionTable);
 
@@ -385,7 +385,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                }
        }
 
-       private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskManagerServices, 
PartitionTable partitionTable) throws IOException {
+       private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskManagerServices, 
PartitionTable<JobID> partitionTable) throws IOException {
                return new TestingTaskExecutor(
                        RPC,
                        TaskManagerConfiguration.fromConfiguration(new 
Configuration()),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 33113f8..e67e979 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -1868,7 +1868,7 @@ public class TaskExecutorTest extends TestLogger {
                        null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler,
-                       new PartitionTable());
+                       new PartitionTable<>());
        }
 
        private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskManagerServices) {
@@ -1886,7 +1886,7 @@ public class TaskExecutorTest extends TestLogger {
                        null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler,
-                       new PartitionTable());
+                       new PartitionTable<>());
        }
 
        private static final class StartStopNotifyingLeaderRetrievalService 
implements LeaderRetrievalService {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index a9b6f92..47b2dba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -207,7 +207,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        null,
                        blobCacheService,
                        testingFatalErrorHandler,
-                       new PartitionTable()
+                       new PartitionTable<>()
                );
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 74d2e08..1aa708b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -46,7 +47,7 @@ class TestingTaskExecutor extends TaskExecutor {
                        @Nullable String metricQueryServiceAddress,
                        BlobCacheService blobCacheService,
                        FatalErrorHandler fatalErrorHandler,
-                       PartitionTable partitionTable) {
+                       PartitionTable<JobID> partitionTable) {
                super(
                        rpcService,
                        taskManagerConfiguration,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
index c00517e..4e54af1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
@@ -43,7 +43,7 @@ public class PartitionTableTest extends TestLogger {
 
        @Test
        public void testEmptyTable() {
-               final PartitionTable table = new PartitionTable();
+               final PartitionTable<JobID> table = new PartitionTable<>();
 
                // an empty table should always return an empty collection
                Collection<ResultPartitionID> partitionsForNonExistingJob = 
table.stopTrackingPartitions(JOB_ID);
@@ -55,7 +55,7 @@ public class PartitionTableTest extends TestLogger {
 
        @Test
        public void testStartTrackingPartition() {
-               final PartitionTable table = new PartitionTable();
+               final PartitionTable<JobID> table = new PartitionTable<>();
 
                table.startTrackingPartitions(JOB_ID, 
Collections.singletonList(PARTITION_ID));
 
@@ -64,7 +64,7 @@ public class PartitionTableTest extends TestLogger {
 
        @Test
        public void testStopTrackingAllPartitions() {
-               final PartitionTable table = new PartitionTable();
+               final PartitionTable<JobID> table = new PartitionTable<>();
 
                table.startTrackingPartitions(JOB_ID, 
Collections.singletonList(PARTITION_ID));
 
@@ -76,7 +76,7 @@ public class PartitionTableTest extends TestLogger {
        @Test
        public void testStopTrackingPartitions() {
                final ResultPartitionID partitionId2 = new ResultPartitionID();
-               final PartitionTable table = new PartitionTable();
+               final PartitionTable<JobID> table = new PartitionTable<>();
 
                table.startTrackingPartitions(JOB_ID, 
Collections.singletonList(PARTITION_ID));
                table.startTrackingPartitions(JOB_ID, 
Collections.singletonList(partitionId2));

Reply via email to