This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e95eae275f21796f9983c4d8b0bc4be1a0483e94
Author: klion26 <[email protected]>
AuthorDate: Tue Apr 14 12:06:43 2020 +0800

    [FLINK-16576][state backends] Do not distribute KeyGroupsStateHandle which 
contains empty KeyGroupRange
    
    This closes #11555.
---
 .../flink/runtime/state/KeyGroupsStateHandle.java  |  7 ++-
 .../flink/runtime/state/KeyedStateHandle.java      |  6 ++-
 .../runtime/state/KeyGroupsStateHandleTest.java    | 59 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8092f6c..f78ec51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -83,8 +83,13 @@ public class KeyGroupsStateHandle implements 
StreamStateHandle, KeyedStateHandle
         * @return key-group state over a range that is the intersection 
between this handle's key-group range and the
         *          provided key-group range.
         */
+       @Override
        public KeyGroupsStateHandle getIntersection(KeyGroupRange 
keyGroupRange) {
-               return new 
KeyGroupsStateHandle(groupRangeOffsets.getIntersection(keyGroupRange), 
stateHandle);
+               KeyGroupRangeOffsets offsets = 
groupRangeOffsets.getIntersection(keyGroupRange);
+               if (offsets.getKeyGroupRange().getNumberOfKeyGroups() <= 0) {
+                       return null;
+               }
+               return new KeyGroupsStateHandle(offsets, stateHandle);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java
index 704ec14..a048aeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import javax.annotation.Nullable;
+
 /**
  * Base for the handles of the checkpointed states in keyed streams. When
  * recovering from failures, the handle will be passed to all tasks whose key
@@ -34,7 +36,9 @@ public interface KeyedStateHandle extends 
CompositeStateHandle {
         * Returns a state over a range that is the intersection between this
         * handle's key-group range and the provided key-group range.
         *
-        * @param keyGroupRange The key group range to intersect with
+        * @param keyGroupRange The key group range to intersect with,
+        * will return null if the intersection of this handle's key-group and 
the provided key-group is empty.
         */
+       @Nullable
        KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java
new file mode 100644
index 0000000..ce6bf82
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * A test for {@link KeyGroupsStateHandle}
+ */
+public class KeyGroupsStateHandleTest {
+
+       @Test
+       public void testNonEmptyIntersection() {
+               KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 7);
+               byte[] dummy = new byte[10];
+               StreamStateHandle streamHandle = new 
ByteStreamStateHandle("test", dummy);
+               KeyGroupsStateHandle handle = new KeyGroupsStateHandle(offsets, 
streamHandle);
+
+               KeyGroupRange expectedRange = new KeyGroupRange(0, 3);
+               KeyGroupsStateHandle newHandle = 
handle.getIntersection(expectedRange);
+               assertNotNull(newHandle);
+               assertEquals(streamHandle, newHandle.getDelegateStateHandle());
+               assertEquals(expectedRange, newHandle.getKeyGroupRange());
+       }
+
+       @Test
+       public void testEmptyIntersection() {
+               KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 7);
+               byte[] dummy = new byte[10];
+               StreamStateHandle streamHandle = new 
ByteStreamStateHandle("test", dummy);
+               KeyGroupsStateHandle handle = new KeyGroupsStateHandle(offsets, 
streamHandle);
+               // return null if the the keygroup intersection is empty.
+               KeyGroupRange newRange = new KeyGroupRange(8, 11);
+               assertNull(handle.getIntersection(newRange));
+       }
+}
+

Reply via email to