This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b70ce9d1831877578374e357d19e1345c68646f9
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu May 28 00:09:36 2020 +0200

    [FLINK-17988][checkpointing] Discard only unique channel state delegates
    
    The underlying state handles of channel state handles can be the same.
    Discard should only iterate over unique underlying handles.
---
 .../runtime/checkpoint/OperatorSubtaskState.java   |  4 +-
 .../runtime/state/AbstractChannelStateHandle.java  | 13 ++++
 .../CheckpointCoordinatorFailureTest.java          | 12 ++--
 .../checkpoint/OperatorSubtaskStateTest.java       | 74 ++++++++++++++++++++++
 4 files changed, 97 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 4099ec1..c6cb69f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull;
+import static 
org.apache.flink.runtime.state.AbstractChannelStateHandle.collectUniqueDelegates;
 
 /**
  * This class encapsulates the state for one parallel instance of an operator. 
The complete state of a (logical)
@@ -231,8 +232,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
                        toDispose.addAll(rawOperatorState);
                        toDispose.addAll(managedKeyedState);
                        toDispose.addAll(rawKeyedState);
-                       toDispose.addAll(inputChannelState);
-                       toDispose.addAll(resultSubpartitionState);
+                       
toDispose.addAll(collectUniqueDelegates(inputChannelState, 
resultSubpartitionState));
                        StateUtil.bestEffortDiscardAllStateObjects(toDispose);
                } catch (Exception e) {
                        LOG.warn("Error while discarding operator states.", e);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
index c359d9d..98132b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.annotation.Internal;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -50,6 +53,16 @@ public abstract class AbstractChannelStateHandle<Info> 
implements StateObject {
                this.size = size;
        }
 
+       public static Set<StreamStateHandle> 
collectUniqueDelegates(Collection<? extends AbstractChannelStateHandle<?>>... 
collections) {
+               Set<StreamStateHandle> result = new HashSet<>();
+               for (Collection<? extends AbstractChannelStateHandle<?>> 
collection : collections) {
+                       for (AbstractChannelStateHandle<?> handle : collection) 
{
+                               result.add(handle.getDelegate());
+                       }
+               }
+               return result;
+       }
+
        @Override
        public void discardState() throws Exception {
                delegate.discardState();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index da1fe16..3514f5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -31,10 +33,12 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -90,8 +94,8 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
                OperatorStateHandle managedOpHandle = 
mock(OperatorStreamStateHandle.class);
                OperatorStateHandle rawOpHandle = 
mock(OperatorStreamStateHandle.class);
-               InputChannelStateHandle inputChannelStateHandle = 
mock(InputChannelStateHandle.class);
-               ResultSubpartitionStateHandle resultSubpartitionStateHandle = 
mock(ResultSubpartitionStateHandle.class);
+               InputChannelStateHandle inputChannelStateHandle = new 
InputChannelStateHandle(new InputChannelInfo(0, 1), 
mock(StreamStateHandle.class), Collections.singletonList(1L));
+               ResultSubpartitionStateHandle resultSubpartitionStateHandle = 
new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, 1), 
mock(StreamStateHandle.class), Collections.singletonList(1L));
 
                final OperatorSubtaskState operatorSubtaskState = spy(new 
OperatorSubtaskState(
                        managedOpHandle,
@@ -125,8 +129,8 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                
verify(operatorSubtaskState.getRawOperatorState().iterator().next()).discardState();
                
verify(operatorSubtaskState.getManagedKeyedState().iterator().next()).discardState();
                
verify(operatorSubtaskState.getRawKeyedState().iterator().next()).discardState();
-               
verify(operatorSubtaskState.getInputChannelState().iterator().next()).discardState();
-               
verify(operatorSubtaskState.getResultSubpartitionState().iterator().next()).discardState();
+               
verify(operatorSubtaskState.getInputChannelState().iterator().next().getDelegate()).discardState();
+               
verify(operatorSubtaskState.getResultSubpartitionState().iterator().next().getDelegate()).discardState();
        }
 
        private static final class FailingCompletedCheckpointStore implements 
CompletedCheckpointStore {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
new file mode 100644
index 0000000..55360d5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * {@link OperatorSubtaskState} test.
+ */
+public class OperatorSubtaskStateTest {
+       @Test
+       public void testDiscardDuplicatedDelegatesOnce() {
+               StreamStateHandle delegate = new DiscardOnceStreamStateHandle();
+               new OperatorSubtaskState(
+                       StateObjectCollection.empty(),
+                       StateObjectCollection.empty(),
+                       StateObjectCollection.empty(),
+                       StateObjectCollection.empty(),
+                       new 
StateObjectCollection<>(asList(buildInputChannelHandle(delegate, 1), 
buildInputChannelHandle(delegate, 2))),
+                       new 
StateObjectCollection<>(asList(buildSubpartitionHandle(delegate, 4), 
buildSubpartitionHandle(delegate, 3)))
+               ).discardState();
+       }
+
+       private ResultSubpartitionStateHandle 
buildSubpartitionHandle(StreamStateHandle delegate, int subPartitionIdx1) {
+               return new ResultSubpartitionStateHandle(new 
ResultSubpartitionInfo(0, subPartitionIdx1), delegate, singletonList(0L));
+       }
+
+       private InputChannelStateHandle 
buildInputChannelHandle(StreamStateHandle delegate, int inputChannelIdx) {
+               return new InputChannelStateHandle(new InputChannelInfo(0, 
inputChannelIdx), delegate, singletonList(0L));
+       }
+
+       private static class DiscardOnceStreamStateHandle extends 
ByteStreamStateHandle {
+               private static final long serialVersionUID = 1L;
+
+               private boolean discarded = false;
+
+               DiscardOnceStreamStateHandle() {
+                       super("test", new byte[0]);
+               }
+
+               @Override
+               public void discardState() {
+                       super.discardState();
+                       assertFalse("state was discarded twice", discarded);
+                       discarded = true;
+               }
+       }
+}

Reply via email to