This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b70ce9d1831877578374e357d19e1345c68646f9 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu May 28 00:09:36 2020 +0200 [FLINK-17988][checkpointing] Discard only unique channel state delegates The underlying state handles of channel state handles can be the same. Discard should only iterate over unique underlying handles. --- .../runtime/checkpoint/OperatorSubtaskState.java | 4 +- .../runtime/state/AbstractChannelStateHandle.java | 13 ++++ .../CheckpointCoordinatorFailureTest.java | 12 ++-- .../checkpoint/OperatorSubtaskStateTest.java | 74 ++++++++++++++++++++++ 4 files changed, 97 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index 4099ec1..c6cb69f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.List; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull; +import static org.apache.flink.runtime.state.AbstractChannelStateHandle.collectUniqueDelegates; /** * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical) @@ -231,8 +232,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { toDispose.addAll(rawOperatorState); toDispose.addAll(managedKeyedState); toDispose.addAll(rawKeyedState); - toDispose.addAll(inputChannelState); - toDispose.addAll(resultSubpartitionState); + toDispose.addAll(collectUniqueDelegates(inputChannelState, resultSubpartitionState)); StateUtil.bestEffortDiscardAllStateObjects(toDispose); } catch (Exception e) { LOG.warn("Error while discarding operator states.", e); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java index c359d9d..98132b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java @@ -20,9 +20,12 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -50,6 +53,16 @@ public abstract class AbstractChannelStateHandle<Info> implements StateObject { this.size = size; } + public static Set<StreamStateHandle> collectUniqueDelegates(Collection<? extends AbstractChannelStateHandle<?>>... collections) { + Set<StreamStateHandle> result = new HashSet<>(); + for (Collection<? extends AbstractChannelStateHandle<?>> collection : collections) { + for (AbstractChannelStateHandle<?> handle : collection) { + result.add(handle.getDelegate()); + } + } + return result; + } + @Override public void discardState() throws Exception { delegate.discardState(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index da1fe16..3514f5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -31,10 +33,12 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; @@ -90,8 +94,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class); OperatorStateHandle managedOpHandle = mock(OperatorStreamStateHandle.class); OperatorStateHandle rawOpHandle = mock(OperatorStreamStateHandle.class); - InputChannelStateHandle inputChannelStateHandle = mock(InputChannelStateHandle.class); - ResultSubpartitionStateHandle resultSubpartitionStateHandle = mock(ResultSubpartitionStateHandle.class); + InputChannelStateHandle inputChannelStateHandle = new InputChannelStateHandle(new InputChannelInfo(0, 1), mock(StreamStateHandle.class), Collections.singletonList(1L)); + ResultSubpartitionStateHandle resultSubpartitionStateHandle = new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, 1), mock(StreamStateHandle.class), Collections.singletonList(1L)); final OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState( managedOpHandle, @@ -125,8 +129,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { verify(operatorSubtaskState.getRawOperatorState().iterator().next()).discardState(); verify(operatorSubtaskState.getManagedKeyedState().iterator().next()).discardState(); verify(operatorSubtaskState.getRawKeyedState().iterator().next()).discardState(); - verify(operatorSubtaskState.getInputChannelState().iterator().next()).discardState(); - verify(operatorSubtaskState.getResultSubpartitionState().iterator().next()).discardState(); + verify(operatorSubtaskState.getInputChannelState().iterator().next().getDelegate()).discardState(); + verify(operatorSubtaskState.getResultSubpartitionState().iterator().next().getDelegate()).discardState(); } private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java new file mode 100644 index 0000000..55360d5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; +import org.apache.flink.runtime.state.InputChannelStateHandle; +import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Test; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertFalse; + +/** + * {@link OperatorSubtaskState} test. + */ +public class OperatorSubtaskStateTest { + @Test + public void testDiscardDuplicatedDelegatesOnce() { + StreamStateHandle delegate = new DiscardOnceStreamStateHandle(); + new OperatorSubtaskState( + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.empty(), + new StateObjectCollection<>(asList(buildInputChannelHandle(delegate, 1), buildInputChannelHandle(delegate, 2))), + new StateObjectCollection<>(asList(buildSubpartitionHandle(delegate, 4), buildSubpartitionHandle(delegate, 3))) + ).discardState(); + } + + private ResultSubpartitionStateHandle buildSubpartitionHandle(StreamStateHandle delegate, int subPartitionIdx1) { + return new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, subPartitionIdx1), delegate, singletonList(0L)); + } + + private InputChannelStateHandle buildInputChannelHandle(StreamStateHandle delegate, int inputChannelIdx) { + return new InputChannelStateHandle(new InputChannelInfo(0, inputChannelIdx), delegate, singletonList(0L)); + } + + private static class DiscardOnceStreamStateHandle extends ByteStreamStateHandle { + private static final long serialVersionUID = 1L; + + private boolean discarded = false; + + DiscardOnceStreamStateHandle() { + super("test", new byte[0]); + } + + @Override + public void discardState() { + super.discardState(); + assertFalse("state was discarded twice", discarded); + discarded = true; + } + } +}
