This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e95eae275f21796f9983c4d8b0bc4be1a0483e94 Author: klion26 <[email protected]> AuthorDate: Tue Apr 14 12:06:43 2020 +0800 [FLINK-16576][state backends] Do not distribute KeyGroupsStateHandle which contains empty KeyGroupRange This closes #11555. --- .../flink/runtime/state/KeyGroupsStateHandle.java | 7 ++- .../flink/runtime/state/KeyedStateHandle.java | 6 ++- .../runtime/state/KeyGroupsStateHandleTest.java | 59 ++++++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 8092f6c..f78ec51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -83,8 +83,13 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle * @return key-group state over a range that is the intersection between this handle's key-group range and the * provided key-group range. */ + @Override public KeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) { - return new KeyGroupsStateHandle(groupRangeOffsets.getIntersection(keyGroupRange), stateHandle); + KeyGroupRangeOffsets offsets = groupRangeOffsets.getIntersection(keyGroupRange); + if (offsets.getKeyGroupRange().getNumberOfKeyGroups() <= 0) { + return null; + } + return new KeyGroupsStateHandle(offsets, stateHandle); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java index 704ec14..a048aeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state; +import javax.annotation.Nullable; + /** * Base for the handles of the checkpointed states in keyed streams. When * recovering from failures, the handle will be passed to all tasks whose key @@ -34,7 +36,9 @@ public interface KeyedStateHandle extends CompositeStateHandle { * Returns a state over a range that is the intersection between this * handle's key-group range and the provided key-group range. * - * @param keyGroupRange The key group range to intersect with + * @param keyGroupRange The key group range to intersect with, + * will return null if the intersection of this handle's key-group and the provided key-group is empty. */ + @Nullable KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java new file mode 100644 index 0000000..ce6bf82 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +/** + * A test for {@link KeyGroupsStateHandle} + */ +public class KeyGroupsStateHandleTest { + + @Test + public void testNonEmptyIntersection() { + KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 7); + byte[] dummy = new byte[10]; + StreamStateHandle streamHandle = new ByteStreamStateHandle("test", dummy); + KeyGroupsStateHandle handle = new KeyGroupsStateHandle(offsets, streamHandle); + + KeyGroupRange expectedRange = new KeyGroupRange(0, 3); + KeyGroupsStateHandle newHandle = handle.getIntersection(expectedRange); + assertNotNull(newHandle); + assertEquals(streamHandle, newHandle.getDelegateStateHandle()); + assertEquals(expectedRange, newHandle.getKeyGroupRange()); + } + + @Test + public void testEmptyIntersection() { + KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 7); + byte[] dummy = new byte[10]; + StreamStateHandle streamHandle = new ByteStreamStateHandle("test", dummy); + KeyGroupsStateHandle handle = new KeyGroupsStateHandle(offsets, streamHandle); + // return null if the the keygroup intersection is empty. + KeyGroupRange newRange = new KeyGroupRange(8, 11); + assertNull(handle.getIntersection(newRange)); + } +} +
