This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc72a7c95b4d745e2df64bd75857e77f4d5ca14a Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Jul 26 12:49:45 2019 +0200 [hotfix][coordination] Check whether partition set to track is empty --- .../flink/runtime/taskexecutor/partition/PartitionTable.java | 4 ++++ .../flink/runtime/taskexecutor/partition/PartitionTableTest.java | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java index 02942cf..d214e09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java @@ -51,6 +51,10 @@ public class PartitionTable<K> { Preconditions.checkNotNull(key); Preconditions.checkNotNull(newPartitionIds); + if (newPartitionIds.isEmpty()) { + return; + } + trackedPartitionsPerJob.compute(key, (ignored, partitionIds) -> { if (partitionIds == null) { partitionIds = new HashSet<>(8); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java index 4e54af1..e2f63fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java @@ -63,6 +63,15 @@ public class PartitionTableTest extends TestLogger { } @Test + public void testStartTrackingZeroPartitionDoesNotMutateState() { + final PartitionTable<JobID> table = new PartitionTable<>(); + + table.startTrackingPartitions(JOB_ID, Collections.emptyList()); + + assertFalse(table.hasTrackedPartitions(JOB_ID)); + } + + @Test public void testStopTrackingAllPartitions() { final PartitionTable<JobID> table = new PartitionTable<>();
