http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java deleted file mode 100644 index 49d772e..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/SerializedStateHandle.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.runtime.state.memory; - -import org.apache.flink.migration.runtime.state.AbstractCloseableHandle; -import org.apache.flink.migration.runtime.state.StateHandle; -import org.apache.flink.migration.util.MigrationInstantiationUtil; -import org.apache.flink.util.InstantiationUtil; - -import java.io.IOException; -import java.io.Serializable; - -/** - * A state handle that represents its state in serialized form as bytes. - * - * @param <T> The type of state represented by this state handle. - */ -@SuppressWarnings("deprecation") -public class SerializedStateHandle<T extends Serializable> extends AbstractCloseableHandle implements StateHandle<T> { - - private static final long serialVersionUID = 4145685722538475769L; - - /** The serialized data */ - private final byte[] serializedData; - - /** - * Creates a new serialized state handle, eagerly serializing the given state object. - * - * @param value The state object. - * @throws IOException Thrown, if the serialization fails. - */ - public SerializedStateHandle(T value) throws IOException { - this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value); - } - - /** - * Creates a new serialized state handle, based in the given already serialized data. - * - * @param serializedData The serialized data. - */ - public SerializedStateHandle(byte[] serializedData) { - this.serializedData = serializedData; - } - - @Override - public T getState(ClassLoader classLoader) throws Exception { - if (classLoader == null) { - throw new NullPointerException(); - } - - ensureNotClosed(); - return serializedData == null ? null : MigrationInstantiationUtil.<T>deserializeObject(serializedData, classLoader); - } - - /** - * Gets the size of the serialized state. - * @return The size of the serialized state. - */ - public int getSizeOfSerializedState() { - return serializedData.length; - } - - /** - * Discarding heap-memory backed state is a no-op, so this method does nothing. - */ - @Override - public void discardState() {} - - @Override - public long getStateSize() { - return serializedData.length; - } - - public byte[] getSerializedData() { - return serializedData; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java deleted file mode 100644 index 3f1ff55..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.state; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.state.KeyGroupRangeOffsets; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.util.Migration; - -/** - * This class is just a KeyGroupsStateHandle that is tagged as migration, to figure out which restore logic to apply, - * e.g. when restoring backend data from a state handle. - * - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Internal -@Deprecated -public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle implements Migration { - - private static final long serialVersionUID = -8554427169776881697L; - - /** - * @param groupRangeOffsets range of key-group ids that in the state of this handle - * @param streamStateHandle handle to the actual state of the key-groups - */ - public MigrationKeyGroupStateHandle(KeyGroupRangeOffsets groupRangeOffsets, StreamStateHandle streamStateHandle) { - super(groupRangeOffsets, streamStateHandle); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java deleted file mode 100644 index 2201916..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.state; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataInputStreamWrapper; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.util.Migration; - -import java.io.IOException; - -/** - * This class is just a StreamStateHandle that is tagged as migration, to figure out which restore logic to apply, e.g. - * when restoring backend data from a state handle. - * - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Internal -@Deprecated -public class MigrationStreamStateHandle implements StreamStateHandle, Migration { - - private static final long serialVersionUID = -2332113722532150112L; - private final StreamStateHandle delegate; - - public MigrationStreamStateHandle(StreamStateHandle delegate) { - this.delegate = delegate; - } - - @Override - public FSDataInputStream openInputStream() throws IOException { - return new MigrationFSInputStream(delegate.openInputStream()); - } - - @Override - public void discardState() throws Exception { - delegate.discardState(); - } - - @Override - public long getStateSize() { - return delegate.getStateSize(); - } - - static class MigrationFSInputStream extends FSDataInputStreamWrapper implements Migration { - - public MigrationFSInputStream(FSDataInputStream inputStream) { - super(inputStream); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java deleted file mode 100644 index b044ffb..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskState.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.streaming.runtime.tasks; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.migration.runtime.state.KvStateSnapshot; -import org.apache.flink.migration.runtime.state.StateHandle; - -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; - -/** - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Deprecated -@Internal -@SuppressWarnings("deprecation") -public class StreamTaskState implements Serializable, Closeable { - - private static final long serialVersionUID = 1L; - - private StateHandle<?> operatorState; - - private StateHandle<Serializable> functionState; - - private HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates; - - // ------------------------------------------------------------------------ - - public StateHandle<?> getOperatorState() { - return operatorState; - } - - public void setOperatorState(StateHandle<?> operatorState) { - this.operatorState = operatorState; - } - - public StateHandle<Serializable> getFunctionState() { - return functionState; - } - - public void setFunctionState(StateHandle<Serializable> functionState) { - this.functionState = functionState; - } - - public HashMap<String, KvStateSnapshot<?, ?, ?, ?>> getKvStates() { - return kvStates; - } - - public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates) { - this.kvStates = kvStates; - } - - // ------------------------------------------------------------------------ - - /** - * Checks if this state object actually contains any state, or if all of the state - * fields are null. - * - * @return True, if all state is null, false if at least one state is not null. - */ - public boolean isEmpty() { - return operatorState == null & functionState == null & kvStates == null; - } - - @Override - public void close() throws IOException { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java b/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java deleted file mode 100644 index 7643039..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/migration/streaming/runtime/tasks/StreamTaskStateList.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.streaming.runtime.tasks; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.migration.runtime.state.KvStateSnapshot; -import org.apache.flink.migration.runtime.state.StateHandle; - -import java.io.IOException; -import java.util.HashMap; - -/** - * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. - */ -@Deprecated -@Internal -@SuppressWarnings("deprecation") -public class StreamTaskStateList implements StateHandle<StreamTaskState[]> { - - private static final long serialVersionUID = 1L; - - /** The states for all operator. */ - private final StreamTaskState[] states; - - public StreamTaskStateList(StreamTaskState[] states) throws Exception { - this.states = states; - } - - public boolean isEmpty() { - for (StreamTaskState state : states) { - if (state != null) { - return false; - } - } - return true; - } - - @Override - public StreamTaskState[] getState(ClassLoader userCodeClassLoader) { - return states; - } - - @Override - public void discardState() throws Exception { - } - - @Override - public long getStateSize() throws Exception { - long sumStateSize = 0; - - if (states != null) { - for (StreamTaskState state : states) { - if (state != null) { - StateHandle<?> operatorState = state.getOperatorState(); - StateHandle<?> functionState = state.getFunctionState(); - HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates = state.getKvStates(); - - if (operatorState != null) { - sumStateSize += operatorState.getStateSize(); - } - - if (functionState != null) { - sumStateSize += functionState.getStateSize(); - } - - if (kvStates != null) { - for (KvStateSnapshot<?, ?, ?, ?> kvState : kvStates.values()) { - if (kvState != null) { - sumStateSize += kvState.getStateSize(); - } - } - } - } - } - } - - // State size as sum of all state sizes - return sumStateSize; - } - - @Override - public void close() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java index 145ff6a..a5f908d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.Objects; /** - * Simple container class which contains the raw/managed/legacy operator state and key-group state handles from all sub + * Simple container class which contains the raw/managed operator state and key-group state handles from all sub * tasks of an operator and therefore represents the complete state of a logical operator. */ public class OperatorState implements CompositeStateHandle { @@ -102,15 +102,6 @@ public class OperatorState implements CompositeStateHandle { return maxParallelism; } - public boolean hasNonPartitionedState() { - for (OperatorSubtaskState sts : operatorSubtaskStates.values()) { - if (sts != null && sts.getLegacyOperatorState() != null) { - return true; - } - } - return false; - } - @Override public void discardState() throws Exception { for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) { http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index 296b5ab..3df9c4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -18,21 +18,18 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; @@ -63,16 +60,6 @@ public class OperatorSubtaskState implements CompositeStateHandle { private static final long serialVersionUID = -2394696997971923995L; /** - * Legacy (non-repartitionable) operator state. - * - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @Deprecated - @Nullable - private final StreamStateHandle legacyOperatorState; - - /** * Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */ @Nonnull @@ -103,39 +90,30 @@ public class OperatorSubtaskState implements CompositeStateHandle { */ private final long stateSize; - @VisibleForTesting - public OperatorSubtaskState(StreamStateHandle legacyOperatorState) { - - this(legacyOperatorState, - Collections.<OperatorStateHandle>emptyList(), - Collections.<OperatorStateHandle>emptyList(), - Collections.<KeyedStateHandle>emptyList(), - Collections.<KeyedStateHandle>emptyList()); - } - /** * Empty state. */ public OperatorSubtaskState() { - this(null); + this( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); } public OperatorSubtaskState( - StreamStateHandle legacyOperatorState, Collection<OperatorStateHandle> managedOperatorState, Collection<OperatorStateHandle> rawOperatorState, Collection<KeyedStateHandle> managedKeyedState, Collection<KeyedStateHandle> rawKeyedState) { - this.legacyOperatorState = legacyOperatorState; this.managedOperatorState = Preconditions.checkNotNull(managedOperatorState); this.rawOperatorState = Preconditions.checkNotNull(rawOperatorState); this.managedKeyedState = Preconditions.checkNotNull(managedKeyedState); this.rawKeyedState = Preconditions.checkNotNull(rawKeyedState); try { - long calculateStateSize = getSizeNullSafe(legacyOperatorState); - calculateStateSize += sumAllSizes(managedOperatorState); + long calculateStateSize = sumAllSizes(managedOperatorState); calculateStateSize += sumAllSizes(rawOperatorState); calculateStateSize += sumAllSizes(managedKeyedState); calculateStateSize += sumAllSizes(rawKeyedState); @@ -150,13 +128,12 @@ public class OperatorSubtaskState implements CompositeStateHandle { * Collections (except for legacy state). */ public OperatorSubtaskState( - StreamStateHandle legacyOperatorState, OperatorStateHandle managedOperatorState, OperatorStateHandle rawOperatorState, KeyedStateHandle managedKeyedState, KeyedStateHandle rawKeyedState) { - this(legacyOperatorState, + this( singletonOrEmptyOnNull(managedOperatorState), singletonOrEmptyOnNull(rawOperatorState), singletonOrEmptyOnNull(managedKeyedState), @@ -183,16 +160,6 @@ public class OperatorSubtaskState implements CompositeStateHandle { // -------------------------------------------------------------------------------------------- /** - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @Deprecated - @Nullable - public StreamStateHandle getLegacyOperatorState() { - return legacyOperatorState; - } - - /** * Returns a handle to the managed operator state. */ @Nonnull @@ -228,12 +195,11 @@ public class OperatorSubtaskState implements CompositeStateHandle { public void discardState() { try { List<StateObject> toDispose = - new ArrayList<>(1 + - managedOperatorState.size() + - rawOperatorState.size() + - managedKeyedState.size() + - rawKeyedState.size()); - toDispose.add(legacyOperatorState); + new ArrayList<>( + managedOperatorState.size() + + rawOperatorState.size() + + managedKeyedState.size() + + rawKeyedState.size()); toDispose.addAll(managedOperatorState); toDispose.addAll(rawOperatorState); toDispose.addAll(managedKeyedState); @@ -281,9 +247,6 @@ public class OperatorSubtaskState implements CompositeStateHandle { if (getStateSize() != that.getStateSize()) { return false; } - if (getLegacyOperatorState() != null ? !getLegacyOperatorState().equals(that.getLegacyOperatorState()) : that.getLegacyOperatorState() != null) { - return false; - } if (!getManagedOperatorState().equals(that.getManagedOperatorState())) { return false; } @@ -298,8 +261,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { @Override public int hashCode() { - int result = getLegacyOperatorState() != null ? getLegacyOperatorState().hashCode() : 0; - result = 31 * result + getManagedOperatorState().hashCode(); + int result = getManagedOperatorState().hashCode(); result = 31 * result + getRawOperatorState().hashCode(); result = 31 * result + getManagedKeyedState().hashCode(); result = 31 * result + getRawKeyedState().hashCode(); @@ -310,8 +272,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { @Override public String toString() { return "SubtaskState{" + - "legacyState=" + legacyOperatorState + - ", operatorStateFromBackend=" + managedOperatorState + + "operatorStateFromBackend=" + managedOperatorState + ", operatorStateFromStream=" + rawOperatorState + ", keyedStateFromBackend=" + managedKeyedState + ", keyedStateFromStream=" + rawKeyedState + @@ -320,8 +281,7 @@ public class OperatorSubtaskState implements CompositeStateHandle { } public boolean hasState() { - return legacyOperatorState != null - || hasState(managedOperatorState) + return hasState(managedOperatorState) || hasState(rawOperatorState) || hasState(managedKeyedState) || hasState(rawKeyedState); http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index b69285e..cc9f9cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -162,8 +161,6 @@ public class StateAssignmentOperation { Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[subTaskIndex] .getCurrentExecutionAttempt(); - List<StreamStateHandle> subNonPartitionableState = new ArrayList<>(); - Tuple2<Collection<KeyedStateHandle>, Collection<KeyedStateHandle>> subKeyedState = null; List<Collection<OperatorStateHandle>> subManagedOperatorState = new ArrayList<>(); @@ -174,15 +171,6 @@ public class StateAssignmentOperation { OperatorState operatorState = operatorStates.get(operatorIndex); int oldParallelism = operatorState.getParallelism(); - // NonPartitioned State - - reAssignSubNonPartitionedStates( - operatorState, - subTaskIndex, - newParallelism, - oldParallelism, - subNonPartitionableState); - // PartitionedState reAssignSubPartitionableState( newManagedOperatorStates, @@ -204,8 +192,7 @@ public class StateAssignmentOperation { } // check if a stateless task - if (!allElementsAreNull(subNonPartitionableState) || - !allElementsAreNull(subManagedOperatorState) || + if (!allElementsAreNull(subManagedOperatorState) || !allElementsAreNull(subRawOperatorState) || subKeyedState != null) { @@ -226,7 +213,6 @@ public class StateAssignmentOperation { OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - subNonPartitionableState.get(i), subManagedOperatorState.get(i), subRawOperatorState.get(i), managedKeyed, @@ -314,24 +300,6 @@ public class StateAssignmentOperation { return true; } - - private void reAssignSubNonPartitionedStates( - OperatorState operatorState, - int subTaskIndex, - int newParallelism, - int oldParallelism, - List<StreamStateHandle> subNonPartitionableState) { - if (oldParallelism == newParallelism) { - if (operatorState.getState(subTaskIndex) != null) { - subNonPartitionableState.add(operatorState.getState(subTaskIndex).getLegacyOperatorState()); - } else { - subNonPartitionableState.add(null); - } - } else { - subNonPartitionableState.add(null); - } - } - private void reDistributePartitionableStates( List<OperatorState> operatorStates, int newParallelism, List<List<Collection<OperatorStateHandle>>> newManagedOperatorStates, @@ -524,19 +492,6 @@ public class StateAssignmentOperation { "is currently not supported."); } } - - //----------------------------------------parallelism preconditions----------------------------------------- - - final int oldParallelism = operatorState.getParallelism(); - final int newParallelism = executionJobVertex.getParallelism(); - - if (operatorState.hasNonPartitionedState() && (oldParallelism != newParallelism)) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding " + - "state object has a parallelism of " + oldParallelism); - } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index 20d675b..281693b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -25,14 +25,12 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.runtime.state.StreamStateHandle; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * Container for the chained state of one parallel subtask of an operator/task. This is part of the * {@link TaskState}. @@ -44,15 +42,6 @@ public class SubtaskState implements CompositeStateHandle { private static final long serialVersionUID = -2394696997971923995L; /** - * Legacy (non-repartitionable) operator state. - * - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @Deprecated - private final ChainedStateHandle<StreamStateHandle> legacyOperatorState; - - /** * Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */ private final ChainedStateHandle<OperatorStateHandle> managedOperatorState; @@ -80,21 +69,18 @@ public class SubtaskState implements CompositeStateHandle { private final long stateSize; public SubtaskState( - ChainedStateHandle<StreamStateHandle> legacyOperatorState, ChainedStateHandle<OperatorStateHandle> managedOperatorState, ChainedStateHandle<OperatorStateHandle> rawOperatorState, KeyedStateHandle managedKeyedState, KeyedStateHandle rawKeyedState) { - this.legacyOperatorState = checkNotNull(legacyOperatorState, "State"); this.managedOperatorState = managedOperatorState; this.rawOperatorState = rawOperatorState; this.managedKeyedState = managedKeyedState; this.rawKeyedState = rawKeyedState; try { - long calculateStateSize = getSizeNullSafe(legacyOperatorState); - calculateStateSize += getSizeNullSafe(managedOperatorState); + long calculateStateSize = getSizeNullSafe(managedOperatorState); calculateStateSize += getSizeNullSafe(rawOperatorState); calculateStateSize += getSizeNullSafe(managedKeyedState); calculateStateSize += getSizeNullSafe(rawKeyedState); @@ -110,15 +96,6 @@ public class SubtaskState implements CompositeStateHandle { // -------------------------------------------------------------------------------------------- - /** - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @Deprecated - public ChainedStateHandle<StreamStateHandle> getLegacyOperatorState() { - return legacyOperatorState; - } - public ChainedStateHandle<OperatorStateHandle> getManagedOperatorState() { return managedOperatorState; } @@ -140,7 +117,6 @@ public class SubtaskState implements CompositeStateHandle { try { StateUtil.bestEffortDiscardAllStateObjects( Arrays.asList( - legacyOperatorState, managedOperatorState, rawOperatorState, managedKeyedState, @@ -183,11 +159,6 @@ public class SubtaskState implements CompositeStateHandle { return false; } - if (legacyOperatorState != null ? - !legacyOperatorState.equals(that.legacyOperatorState) - : that.legacyOperatorState != null) { - return false; - } if (managedOperatorState != null ? !managedOperatorState.equals(that.managedOperatorState) : that.managedOperatorState != null) { @@ -211,8 +182,7 @@ public class SubtaskState implements CompositeStateHandle { @Override public int hashCode() { - int result = legacyOperatorState != null ? legacyOperatorState.hashCode() : 0; - result = 31 * result + (managedOperatorState != null ? managedOperatorState.hashCode() : 0); + int result = (managedOperatorState != null ? managedOperatorState.hashCode() : 0); result = 31 * result + (rawOperatorState != null ? rawOperatorState.hashCode() : 0); result = 31 * result + (managedKeyedState != null ? managedKeyedState.hashCode() : 0); result = 31 * result + (rawKeyedState != null ? rawKeyedState.hashCode() : 0); @@ -223,8 +193,7 @@ public class SubtaskState implements CompositeStateHandle { @Override public String toString() { return "SubtaskState{" + - "chainedStateHandle=" + legacyOperatorState + - ", operatorStateFromBackend=" + managedOperatorState + + "operatorStateFromBackend=" + managedOperatorState + ", operatorStateFromStream=" + rawOperatorState + ", keyedStateFromBackend=" + managedKeyedState + ", keyedStateFromStream=" + rawKeyedState + http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index ed847a4..0f3bedb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -48,7 +48,6 @@ public class TaskState implements CompositeStateHandle { /** handles to non-partitioned states, subtaskindex -> subtaskstate */ private final Map<Integer, SubtaskState> subtaskStates; - /** parallelism of the operator when it was checkpointed */ private final int parallelism; @@ -117,15 +116,6 @@ public class TaskState implements CompositeStateHandle { return chainLength; } - public boolean hasNonPartitionedState() { - for(SubtaskState sts : subtaskStates.values()) { - if (sts != null && !sts.getLegacyOperatorState().isEmpty()) { - return true; - } - } - return false; - } - @Override public void discardState() throws Exception { for (SubtaskState subtaskState : subtaskStates.values()) { http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java index c1fcf4f..12e9c5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java @@ -18,8 +18,7 @@ package org.apache.flink.runtime.checkpoint.savepoint; -import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0; -import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.Preconditions; import java.util.HashMap; @@ -30,15 +29,20 @@ import java.util.Map; */ public class SavepointSerializers { + /** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format) */ + static boolean FAIL_WHEN_LEGACY_STATE_DETECTED = true; private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2); static { - SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE); SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE); SERIALIZERS.put(SavepointV2.VERSION, SavepointV2Serializer.INSTANCE); } + private SavepointSerializers() { + throw new AssertionError(); + } + // ------------------------------------------------------------------------ /** @@ -77,4 +81,12 @@ public class SavepointSerializers { } } + /** + * This is only visible as a temporary solution to keep the stateful job migration it cases working from binary + * savepoints that still contain legacy state (<= Flink 1.1). + */ + @VisibleForTesting + public static void setFailWhenLegacyStateDetected(boolean fail) { + FAIL_WHEN_LEGACY_STATE_DETECTED = fail; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 7beb1b8..586df57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index f67d54c..c26c983 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.util.Preconditions; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -59,7 +60,6 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { private static final byte KEY_GROUPS_HANDLE = 3; private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4; - public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer(); private SavepointV1Serializer() { @@ -130,20 +130,15 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException { - dos.writeLong(-1); - - ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState(); + //backwards compatibility, do not remove + dos.writeLong(-1L); - int len = nonPartitionableState != null ? nonPartitionableState.getLength() : 0; - dos.writeInt(len); - for (int i = 0; i < len; ++i) { - StreamStateHandle stateHandle = nonPartitionableState.get(i); - serializeStreamStateHandle(stateHandle, dos); - } + //backwards compatibility (number of legacy state handles), do not remove + dos.writeInt(0); ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState(); - len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0; + int len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0; dos.writeInt(len); for (int i = 0; i < len; ++i) { OperatorStateHandle stateHandle = operatorStateBackend.get(i); @@ -171,12 +166,19 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { long ignoredDuration = dis.readLong(); int len = dis.readInt(); - List<StreamStateHandle> nonPartitionableState = new ArrayList<>(len); - for (int i = 0; i < len; ++i) { - StreamStateHandle streamStateHandle = deserializeStreamStateHandle(dis); - nonPartitionableState.add(streamStateHandle); - } + if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) { + Preconditions.checkState(len == 0, + "Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " + + "no longer supported starting from Flink 1.4. Please rewrite your job to use " + + "'CheckpointedFunction' instead!"); + + } else { + for (int i = 0; i < len; ++i) { + // absorb bytes from stream and ignore result + deserializeStreamStateHandle(dis); + } + } len = dis.readInt(); List<OperatorStateHandle> operatorStateBackend = new ArrayList<>(len); @@ -196,9 +198,6 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis); - ChainedStateHandle<StreamStateHandle> nonPartitionableStateChain = - new ChainedStateHandle<>(nonPartitionableState); - ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain = new ChainedStateHandle<>(operatorStateBackend); @@ -206,7 +205,6 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { new ChainedStateHandle<>(operatorStateStream); return new SubtaskState( - nonPartitionableStateChain, operatorStateBackendChain, operatorStateStreamChain, keyedStateBackend, http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java index bd364a2..9e406df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; import java.util.Collection; @@ -207,9 +206,6 @@ public class SavepointV2 implements Savepoint { continue; } - @SuppressWarnings("deprecation") - ChainedStateHandle<StreamStateHandle> nonPartitionedState = - subtaskState.getLegacyOperatorState(); ChainedStateHandle<OperatorStateHandle> partitioneableState = subtaskState.getManagedOperatorState(); ChainedStateHandle<OperatorStateHandle> rawOperatorState = @@ -240,7 +236,6 @@ public class SavepointV2 implements Savepoint { } OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - nonPartitionedState != null ? nonPartitionedState.get(operatorIndex) : null, partitioneableState != null ? partitioneableState.get(operatorIndex) : null, rawOperatorState != null ? rawOperatorState.get(operatorIndex) : null, managedKeyedState, http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index 15628a0..5636a52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.util.Preconditions; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -256,13 +257,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { dos.writeLong(-1); - StreamStateHandle nonPartitionableState = subtaskState.getLegacyOperatorState(); - - int len = nonPartitionableState != null ? 1 : 0; + int len = 0; dos.writeInt(len); - if (len == 1) { - serializeStreamStateHandle(nonPartitionableState, dos); - } OperatorStateHandle operatorStateBackend = extractSingleton(subtaskState.getManagedOperatorState()); @@ -288,11 +284,23 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { } private static OperatorSubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException { - // Duration field has been removed from SubtaskState + // Duration field has been removed from SubtaskState, do not remove long ignoredDuration = dis.readLong(); + // for compatibility, do not remove int len = dis.readInt(); - StreamStateHandle nonPartitionableState = len == 0 ? null : deserializeStreamStateHandle(dis); + + if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) { + Preconditions.checkState(len == 0, + "Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " + + "no longer supported starting from Flink 1.4. Please rewrite your job to use " + + "'CheckpointedFunction' instead!"); + } else { + for (int i = 0; i < len; ++i) { + // absorb bytes from stream and ignore result + deserializeStreamStateHandle(dis); + } + } len = dis.readInt(); OperatorStateHandle operatorStateBackend = len == 0 ? null : deserializeOperatorStateHandle(dis); @@ -305,7 +313,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis); return new OperatorSubtaskState( - nonPartitionableState, operatorStateBackend, operatorStateStream, keyedStateBackend, http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java index 2800899..8b58891 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.migration.MigrationNamespaceSerializerProxy; import java.io.IOException; @@ -90,11 +89,4 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN public boolean canEqual(Object obj) { return obj instanceof VoidNamespaceSerializer; } - - @Override - protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { - // we might be replacing a migration namespace serializer, in which case we just assume compatibility - return super.isCompatibleSerializationFormatIdentifier(identifier) - || identifier.equals(MigrationNamespaceSerializerProxy.class.getCanonicalName()); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index d1c0466..e235b96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -35,11 +35,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.migration.MigrationNamespaceSerializerProxy; -import org.apache.flink.migration.MigrationUtil; -import org.apache.flink.migration.runtime.state.KvStateSnapshot; -import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot; -import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; @@ -65,7 +60,6 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -190,7 +184,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // check compatibility results to determine if state migration is required CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), - MigrationNamespaceSerializerProxy.class, + null, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), newMetaInfo.getNamespaceSerializer()); @@ -405,11 +399,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LOG.debug("Restoring snapshot from state handles: {}.", restoredState); } - if (MigrationUtil.isOldSavepointKeyedState(restoredState)) { - restoreOldSavepointKeyedState(restoredState); - } else { - restorePartitionedState(restoredState); - } + restorePartitionedState(restoredState); } @SuppressWarnings({"unchecked"}) @@ -560,55 +550,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * @deprecated Used for backwards compatibility with previous savepoint versions. - */ - @SuppressWarnings({"unchecked", "rawtypes", "DeprecatedIsStillUsed"}) - @Deprecated - private void restoreOldSavepointKeyedState( - Collection<KeyedStateHandle> stateHandles) throws IOException, ClassNotFoundException { - - if (stateHandles.isEmpty()) { - return; - } - - Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here."); - - KeyedStateHandle keyedStateHandle = stateHandles.iterator().next(); - if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + MigrationKeyGroupStateHandle.class + - ", but found " + keyedStateHandle.getClass()); - } - - MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle; - - HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates; - try (FSDataInputStream inputStream = keyGroupStateHandle.openInputStream()) { - namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader); - } - - for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) { - - final String stateName = nameToState.getKey(); - final KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue(); - - if (genericSnapshot instanceof MigrationRestoreSnapshot) { - MigrationRestoreSnapshot<K, ?, ?> stateSnapshot = (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot; - final StateTable rawResultMap = - stateSnapshot.deserialize(stateName, this); - - // mimic a restored kv state meta info - restoredKvStateMetaInfos.put(stateName, rawResultMap.getMetaInfo().snapshot()); - - // add named state to the backend - stateTables.put(stateName, rawResultMap); - } else { - throw new IllegalStateException("Unknown state: " + genericSnapshot); - } - } - } - - /** * Returns the total number of state entries across all keys/namespaces. */ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 26db772..7c95a34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -92,15 +91,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next(); - - StreamStateHandle legacyHandle = mock(StreamStateHandle.class); KeyedStateHandle managedKeyedHandle = mock(KeyedStateHandle.class); KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class); OperatorStateHandle managedOpHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOpHandle = mock(OperatorStateHandle.class); final OperatorSubtaskState operatorSubtaskState = spy(new OperatorSubtaskState( - legacyHandle, managedOpHandle, rawOpHandle, managedKeyedHandle, @@ -126,7 +122,6 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { // make sure that the subtask state has been discarded after we could not complete it. verify(operatorSubtaskState).discardState(); - verify(operatorSubtaskState.getLegacyOperatorState()).discardState(); verify(operatorSubtaskState.getManagedOperatorState().iterator().next()).discardState(); verify(operatorSubtaskState.getRawOperatorState().iterator().next()).discardState(); verify(operatorSubtaskState.getManagedKeyedState().iterator().next()).discardState(); http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 45cbbc3..4193c2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -90,7 +90,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -2149,15 +2148,13 @@ public class CheckpointCoordinatorTest extends TestLogger { assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); - CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); List<KeyGroupRange> keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); List<KeyGroupRange> keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); for (int index = 0; index < jobVertex1.getParallelism(); index++) { - StreamStateHandle valueSizeTuple = generateStateForVertex(jobVertexID1, index); KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false); - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(valueSizeTuple, null, null, keyGroupState, null); + OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(null, null, keyGroupState, null); TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID1), operatorSubtaskState); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( @@ -2172,9 +2169,8 @@ public class CheckpointCoordinatorTest extends TestLogger { for (int index = 0; index < jobVertex2.getParallelism(); index++) { - StreamStateHandle valueSizeTuple = generateStateForVertex(jobVertexID2, index); KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false); - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(valueSizeTuple, null, null, keyGroupState, null); + OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(null, null, keyGroupState, null); TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), operatorSubtaskState); AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( @@ -2214,137 +2210,6 @@ public class CheckpointCoordinatorTest extends TestLogger { fail("The restoration should have failed because the max parallelism changed."); } - /** - * Tests that the checkpoint restoration fails if the parallelism of a job vertices with - * non-partitioned state has changed. - * - * @throws Exception - */ - @Test(expected=IllegalStateException.class) - public void testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Exception { - final JobID jid = new JobID(); - final long timestamp = System.currentTimeMillis(); - - final JobVertexID jobVertexID1 = new JobVertexID(); - final JobVertexID jobVertexID2 = new JobVertexID(); - int parallelism1 = 3; - int parallelism2 = 2; - int maxParallelism1 = 42; - int maxParallelism2 = 13; - - final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex( - jobVertexID1, - parallelism1, - maxParallelism1); - final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex( - jobVertexID2, - parallelism2, - maxParallelism2); - - List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1 + parallelism2); - - allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices())); - allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices())); - - ExecutionVertex[] arrayExecutionVertices = - allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); - - // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); - - // trigger the checkpoint - coord.triggerCheckpoint(timestamp, false); - - assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); - long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); - CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); - - List<KeyGroupRange> keyGroupPartitions1 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); - List<KeyGroupRange> keyGroupPartitions2 = - StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); - - for (int index = 0; index < jobVertex1.getParallelism(); index++) { - StreamStateHandle valueSizeTuple = generateStateForVertex(jobVertexID1, index); - KeyGroupsStateHandle keyGroupState = generateKeyGroupState( - jobVertexID1, keyGroupPartitions1.get(index), false); - - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(valueSizeTuple, null, null, keyGroupState, null); - TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); - taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID1), operatorSubtaskState); - - AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - taskOperatorSubtaskStates); - - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); - } - - - for (int index = 0; index < jobVertex2.getParallelism(); index++) { - - StreamStateHandle state = generateStateForVertex(jobVertexID2, index); - KeyGroupsStateHandle keyGroupState = generateKeyGroupState( - jobVertexID2, keyGroupPartitions2.get(index), false); - - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(state, null, null, keyGroupState, null); - TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); - taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), operatorSubtaskState); - AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, - new CheckpointMetrics(), - taskOperatorSubtaskStates); - - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); - } - - List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints(); - - assertEquals(1, completedCheckpoints.size()); - - Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); - - int newParallelism1 = 4; - int newParallelism2 = 3; - - final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex( - jobVertexID1, - newParallelism1, - maxParallelism1); - - final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex( - jobVertexID2, - newParallelism2, - maxParallelism2); - - tasks.put(jobVertexID1, newJobVertex1); - tasks.put(jobVertexID2, newJobVertex2); - - coord.restoreLatestCheckpointedState(tasks, true, false); - - fail("The restoration should have failed because the parallelism of an vertex with " + - "non-partitioned state changed."); - } - @Test public void testRestoreLatestCheckpointedStateScaleIn() throws Exception { testRestoreLatestCheckpointedStateWithChangingParallelism(false); @@ -2439,12 +2304,10 @@ public class CheckpointCoordinatorTest extends TestLogger { //vertex 1 for (int index = 0; index < jobVertex1.getParallelism(); index++) { - StreamStateHandle valueSizeTuple = generateStateForVertex(jobVertexID1, index); OperatorStateHandle opStateBackend = generatePartitionableStateHandle(jobVertexID1, index, 2, 8, false); KeyGroupsStateHandle keyedStateBackend = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false); KeyGroupsStateHandle keyedStateRaw = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), true); - - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw); + OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, null, keyedStateBackend, keyedStateRaw); TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID1), operatorSubtaskState); @@ -2469,7 +2332,7 @@ public class CheckpointCoordinatorTest extends TestLogger { expectedOpStatesBackend.add(new ChainedStateHandle<>(Collections.singletonList(opStateBackend))); expectedOpStatesRaw.add(new ChainedStateHandle<>(Collections.singletonList(opStateRaw))); - OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(null, opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw); + OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw); TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID2), operatorSubtaskState); @@ -2527,7 +2390,6 @@ public class CheckpointCoordinatorTest extends TestLogger { for (int idx = 0; idx < operatorIDs.size(); ++idx) { OperatorID operatorID = operatorIDs.get(idx); OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID); - Assert.assertNull(opState.getLegacyOperatorState()); Collection<OperatorStateHandle> opStateBackend = opState.getManagedOperatorState(); Collection<OperatorStateHandle> opStateRaw = opState.getRawOperatorState(); allParallelManagedOpStates.add(opStateBackend); @@ -2593,14 +2455,11 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1); operatorStates.put(id.f1, taskState); for (int index = 0; index < taskState.getParallelism(); index++) { - StreamStateHandle subNonPartitionedState = - generateStateForVertex(id.f0, index); OperatorStateHandle subManagedOperatorState = generatePartitionableStateHandle(id.f0, index, 2, 8, false); OperatorStateHandle subRawOperatorState = generatePartitionableStateHandle(id.f0, index, 2, 8, true); - - OperatorSubtaskState subtaskState = new OperatorSubtaskState(subNonPartitionedState, + OperatorSubtaskState subtaskState = new OperatorSubtaskState( subManagedOperatorState, subRawOperatorState, null, @@ -2638,7 +2497,6 @@ public class CheckpointCoordinatorTest extends TestLogger { expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle(subRawOperatorState)); OperatorSubtaskState subtaskState = new OperatorSubtaskState( - null, subManagedOperatorState, subRawOperatorState, subManagedKeyedState, @@ -2735,7 +2593,6 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); - assertNull(opState.getLegacyOperatorState()); assertTrue(opState.getManagedOperatorState().isEmpty()); assertTrue(opState.getRawOperatorState().isEmpty()); } @@ -2745,16 +2602,11 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); - StreamStateHandle expectSubNonPartitionedState = generateStateForVertex(id1.f0, i); OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle( id1.f0, i, 2, 8, false); OperatorStateHandle expectedRawOpState = generatePartitionableStateHandle( id1.f0, i, 2, 8, true); - assertTrue(CommonTestUtils.isSteamContentEqual( - expectSubNonPartitionedState.openInputStream(), - opState.getLegacyOperatorState().openInputStream())); - Collection<OperatorStateHandle> managedOperatorState = opState.getManagedOperatorState(); assertEquals(1, managedOperatorState.size()); assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.openInputStream(), @@ -2771,16 +2623,11 @@ public class CheckpointCoordinatorTest extends TestLogger { OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); - StreamStateHandle expectSubNonPartitionedState = generateStateForVertex(id2.f0, i); OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle( id2.f0, i, 2, 8, false); OperatorStateHandle expectedRawOpState = generatePartitionableStateHandle( id2.f0, i, 2, 8, true); - assertTrue(CommonTestUtils.isSteamContentEqual( - expectSubNonPartitionedState.openInputStream(), - opState.getLegacyOperatorState().openInputStream())); - Collection<OperatorStateHandle> managedOperatorState = opState.getManagedOperatorState(); assertEquals(1, managedOperatorState.size()); assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.openInputStream(), @@ -2816,8 +2663,6 @@ public class CheckpointCoordinatorTest extends TestLogger { actualManagedOperatorStates.add(actualSubManagedOperatorState); actualRawOperatorStates.add(actualSubRawOperatorState); - - assertNull(opState.getLegacyOperatorState()); } // operator 6 @@ -2825,7 +2670,6 @@ public class CheckpointCoordinatorTest extends TestLogger { int operatorIndexInChain = 0; OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); - assertNull(opState.getLegacyOperatorState()); assertTrue(opState.getManagedOperatorState().isEmpty()); assertTrue(opState.getRawOperatorState().isEmpty()); @@ -3216,13 +3060,12 @@ public class CheckpointCoordinatorTest extends TestLogger { int index, KeyGroupRange keyGroupRange) throws IOException { - StreamStateHandle nonPartitionedState = generateStateForVertex(jobVertexID, index); OperatorStateHandle partitionableState = generatePartitionableStateHandle(jobVertexID, index, 2, 8, false); KeyGroupsStateHandle partitionedKeyGroupState = generateKeyGroupState(jobVertexID, keyGroupRange, false); TaskStateSnapshot subtaskStates = spy(new TaskStateSnapshot()); OperatorSubtaskState subtaskState = spy(new OperatorSubtaskState( - nonPartitionedState, partitionableState, null, partitionedKeyGroupState, null) + partitionableState, null, partitionedKeyGroupState, null) ); subtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID), subtaskState); @@ -3236,17 +3079,10 @@ public class CheckpointCoordinatorTest extends TestLogger { for (int i = 0; i < executionJobVertex.getParallelism(); i++) { - final List<OperatorID> operatorIds = executionJobVertex.getOperatorIDs(); - TaskStateSnapshot stateSnapshot = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot(); OperatorSubtaskState operatorState = stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID)); - StreamStateHandle expectNonPartitionedState = generateStateForVertex(jobVertexID, i); - assertTrue(CommonTestUtils.isSteamContentEqual( - expectNonPartitionedState.openInputStream(), - operatorState.getLegacyOperatorState().openInputStream())); - ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend = generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false); @@ -3926,7 +3762,7 @@ public class CheckpointCoordinatorTest extends TestLogger { spy(new ByteStreamStateHandle("meta", new byte[]{'m'})))); OperatorSubtaskState operatorSubtaskState = - spy(new OperatorSubtaskState(null, + spy(new OperatorSubtaskState( Collections.<OperatorStateHandle>emptyList(), Collections.<OperatorStateHandle>emptyList(), Collections.<KeyedStateHandle>singletonList(managedState), http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 791bffa..1788434 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -29,12 +29,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.SerializableObject; import org.hamcrest.BaseMatcher; @@ -67,7 +65,6 @@ public class CheckpointStateRestoreTest { public void testSetState() { try { - final ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject()); KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0); List<SerializableObject> testStates = Collections.singletonList(new SerializableObject()); final KeyedStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); @@ -125,7 +122,6 @@ public class CheckpointStateRestoreTest { subtaskStates.putSubtaskStateByOperatorID( OperatorID.fromJobVertexID(statefulId), new OperatorSubtaskState( - serializedState.get(0), Collections.<OperatorStateHandle>emptyList(), Collections.<OperatorStateHandle>emptyList(), Collections.singletonList(serializedKeyGroupStates), @@ -249,17 +245,13 @@ public class CheckpointStateRestoreTest { Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY); - StreamStateHandle serializedState = CheckpointCoordinatorTest - .generateChainedStateHandle(new SerializableObject()) - .get(0); - // --- (2) Checkpoint misses state for a jobVertex (should work) --- Map<OperatorID, OperatorState> checkpointTaskStates = new HashMap<>(); { OperatorState taskState = new OperatorState(operatorId1, 3, 3); - taskState.putState(0, new OperatorSubtaskState(serializedState)); - taskState.putState(1, new OperatorSubtaskState(serializedState)); - taskState.putState(2, new OperatorSubtaskState(serializedState)); + taskState.putState(0, new OperatorSubtaskState()); + taskState.putState(1, new OperatorSubtaskState()); + taskState.putState(2, new OperatorSubtaskState()); checkpointTaskStates.put(operatorId1, taskState); } @@ -286,7 +278,7 @@ public class CheckpointStateRestoreTest { // There is no task for this { OperatorState taskState = new OperatorState(newOperatorID, 1, 1); - taskState.putState(0, new OperatorSubtaskState(serializedState)); + taskState.putState(0, new OperatorSubtaskState()); checkpointTaskStates.put(newOperatorID, taskState); } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java index de1f599..acedb50 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java @@ -77,7 +77,6 @@ public class CheckpointTestUtils { OperatorState taskState = new OperatorState(new OperatorID(), numSubtasksPerTask, 128); - boolean hasNonPartitionableState = random.nextBoolean(); boolean hasOperatorStateBackend = random.nextBoolean(); boolean hasOperatorStateStream = random.nextBoolean(); @@ -87,7 +86,6 @@ public class CheckpointTestUtils { for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) { - StreamStateHandle nonPartitionableState = null; StreamStateHandle operatorStateBackend = new TestByteStreamStateHandleDeepCompare("b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); StreamStateHandle operatorStateStream = @@ -101,11 +99,6 @@ public class CheckpointTestUtils { offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.BROADCAST)); - if (hasNonPartitionableState) { - nonPartitionableState = - new TestByteStreamStateHandleDeepCompare("a", ("Hi").getBytes(ConfigConstants.DEFAULT_CHARSET)); - } - if (hasOperatorStateBackend) { operatorStateHandleBackend = new OperatorStateHandle(offsetsMap, operatorStateBackend); } @@ -130,7 +123,6 @@ public class CheckpointTestUtils { } taskState.putState(subtaskIdx, new OperatorSubtaskState( - nonPartitionableState, operatorStateHandleBackend, operatorStateHandleStream, keyedStateStream, @@ -175,15 +167,11 @@ public class CheckpointTestUtils { for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) { - List<StreamStateHandle> nonPartitionableStates = new ArrayList<>(chainLength); List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(chainLength); List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(chainLength); for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) { - StreamStateHandle nonPartitionableState = - new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes( - ConfigConstants.DEFAULT_CHARSET)); StreamStateHandle operatorStateBackend = new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET)); StreamStateHandle operatorStateStream = @@ -193,10 +181,6 @@ public class CheckpointTestUtils { offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.BROADCAST)); - if (chainIdx != noNonPartitionableStateAtIndex) { - nonPartitionableStates.add(nonPartitionableState); - } - if (chainIdx != noOperatorStateBackendAtIndex) { OperatorStateHandle operatorStateHandleBackend = new OperatorStateHandle(offsetsMap, operatorStateBackend); @@ -222,7 +206,6 @@ public class CheckpointTestUtils { } taskState.putState(subtaskIdx, new SubtaskState( - new ChainedStateHandle<>(nonPartitionableStates), new ChainedStateHandle<>(operatorStatesBackend), new ChainedStateHandle<>(operatorStatesStream), keyedStateStream,