This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 039ec028809863712509b785aa5e2aff19a5018b
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Jul 26 12:49:45 2019 +0200

    [hotfix][coordination] Check whether partition set to track is empty
---
 .../flink/runtime/taskexecutor/partition/PartitionTable.java     | 4 ++++
 .../flink/runtime/taskexecutor/partition/PartitionTableTest.java | 9 +++++++++
 2 files changed, 13 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
index 02942cf..d214e09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
@@ -51,6 +51,10 @@ public class PartitionTable<K> {
                Preconditions.checkNotNull(key);
                Preconditions.checkNotNull(newPartitionIds);
 
+               if (newPartitionIds.isEmpty()) {
+                       return;
+               }
+
                trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> 
{
                        if (partitionIds == null) {
                                partitionIds = new HashSet<>(8);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
index 4e54af1..e2f63fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
@@ -63,6 +63,15 @@ public class PartitionTableTest extends TestLogger {
        }
 
        @Test
+       public void testStartTrackingZeroPartitionDoesNotMutateState() {
+               final PartitionTable<JobID> table = new PartitionTable<>();
+
+               table.startTrackingPartitions(JOB_ID, Collections.emptyList());
+
+               assertFalse(table.hasTrackedPartitions(JOB_ID));
+       }
+
+       @Test
        public void testStopTrackingAllPartitions() {
                final PartitionTable<JobID> table = new PartitionTable<>();
 

Reply via email to