[
https://issues.apache.org/jira/browse/FLINK-5041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15725666#comment-15725666
]
ASF GitHub Bot commented on FLINK-5041:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2781#discussion_r91071609
--- Diff:
flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
---
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.runtime.checkpoint.savepoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
+import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
+import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.migration.runtime.state.KvStateSnapshot;
+import org.apache.flink.migration.runtime.state.StateHandle;
+import
org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
+import
org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
+import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
+import org.apache.flink.migration.state.MigrationStreamStateHandle;
+import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
+import
org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
+import org.apache.flink.migration.util.SerializedValue;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.MultiStreamStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * <p>
+ * <p>In contrast to previous savepoint versions, this serializer makes
sure
+ * that no default Java serialization is used for serialization.
Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+public class SavepointV0Serializer implements
SavepointSerializer<SavepointV1> {
+
+ public static final SavepointV0Serializer INSTANCE = new
SavepointV0Serializer();
+ private static final StreamStateHandle SIGNAL_0 = new
ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
+ private static final StreamStateHandle SIGNAL_1 = new
ByteStreamStateHandle("SIGNAL_1", new byte[]{1});
+
+ private static final int MAX_SIZE = 4 * 1024 * 1024;
+
+ private ClassLoader userClassLoader;
+ private long checkpointID;
+
+ private SavepointV0Serializer() {
+ }
+
+
+ @Override
+ public void serialize(SavepointV1 savepoint, DataOutputStream dos)
throws IOException {
+ throw new UnsupportedOperationException("This serializer is
read-only and only exists for backwards compatibility");
+ }
+
+ @Override
+ public SavepointV1 deserialize(DataInputStream dis, ClassLoader
userClassLoader) throws IOException {
+
+ this.checkpointID = dis.readLong();
+ this.userClassLoader = userClassLoader;
+
+ // Task states
+ int numTaskStates = dis.readInt();
+ List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+ for (int i = 0; i < numTaskStates; i++) {
+ JobVertexID jobVertexId = new
JobVertexID(dis.readLong(), dis.readLong());
+ int parallelism = dis.readInt();
+
+ // Add task state
+ TaskState taskState = new TaskState(jobVertexId,
parallelism);
+ taskStates.add(taskState);
+
+ // Sub task states
+ int numSubTaskStates = dis.readInt();
+ for (int j = 0; j < numSubTaskStates; j++) {
+ int subtaskIndex = dis.readInt();
+
+ int length = dis.readInt();
+
+ SerializedValue<StateHandle<?>> serializedValue;
+ if (length == -1) {
+ serializedValue = new
SerializedValue<>(null);
+ } else {
+ byte[] serializedData = new
byte[length];
+ dis.readFully(serializedData, 0,
length);
+ serializedValue =
SerializedValue.fromBytes(serializedData);
+ }
+
+ long stateSize = dis.readLong();
+ long duration = dis.readLong();
+
+ SubtaskState subtaskState = new SubtaskState(
+ serializedValue,
+ stateSize,
+ duration);
+
+ taskState.putState(subtaskIndex, subtaskState);
+ }
+
+ // Key group states
+ int numKvStates = dis.readInt();
+ for (int j = 0; j < numKvStates; j++) {
+ int keyGroupIndex = dis.readInt();
+
+ int length = dis.readInt();
+
+ SerializedValue<StateHandle<?>> serializedValue;
+ if (length == -1) {
+ serializedValue = new
SerializedValue<>(null);
+ } else {
+ byte[] serializedData = new
byte[length];
+ dis.readFully(serializedData, 0,
length);
+ serializedValue =
SerializedValue.fromBytes(serializedData);
+ }
+
+ long stateSize = dis.readLong();
+ long duration = dis.readLong();
+
+ KeyGroupState keyGroupState = new KeyGroupState(
+ serializedValue,
+ stateSize,
+ duration);
+
+ taskState.putKvState(keyGroupIndex,
keyGroupState);
+ }
+ }
+
+ try {
+ return convertSavepoint(taskStates);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private SavepointV1 convertSavepoint(List<TaskState> taskStates) throws
Exception {
+
+ List<org.apache.flink.runtime.checkpoint.TaskState>
newTaskStates = new ArrayList<>(taskStates.size());
+
+ for (TaskState taskState : taskStates) {
+ newTaskStates.add(convertTaskState(taskState));
+ }
+
+ return new SavepointV1(checkpointID, newTaskStates);
+ }
+
+ private org.apache.flink.runtime.checkpoint.TaskState
convertTaskState(TaskState taskState) throws Exception {
+
+ JobVertexID jobVertexID = taskState.getJobVertexID();
+ int parallelism = taskState.getParallelism();
+ int chainLength = determineOperatorChainLength(taskState);
+
+ org.apache.flink.runtime.checkpoint.TaskState newTaskState =
+ new
org.apache.flink.runtime.checkpoint.TaskState(
+ jobVertexID,
+ parallelism,
+ parallelism,
+ chainLength);
+
+ if (chainLength > 0) {
+
+ int parallelInstanceIdx = 0;
+ Collection<SubtaskState> subtaskStates =
taskState.getStates();
+
+ for (SubtaskState subtaskState : subtaskStates) {
+ newTaskState.putState(parallelInstanceIdx,
convertSubtaskState(subtaskState, parallelInstanceIdx));
+ ++parallelInstanceIdx;
+ }
+ }
+
+ return newTaskState;
+ }
+
+ private org.apache.flink.runtime.checkpoint.SubtaskState
convertSubtaskState(
+ SubtaskState subtaskState, int parallelInstanceIdx)
throws Exception {
+
+ SerializedValue<StateHandle<?>> serializedValue =
subtaskState.getState();
+
+ StreamTaskStateList stateList = (StreamTaskStateList)
serializedValue.deserializeValue(userClassLoader);
+ StreamTaskState[] streamTaskStates =
stateList.getState(userClassLoader);
+
+ List<StreamStateHandle> newChainStateList = Arrays.asList(new
StreamStateHandle[streamTaskStates.length]);
+ KeyGroupsStateHandle newKeyedState = null;
+
+ for (int chainIdx = 0; chainIdx < streamTaskStates.length;
++chainIdx) {
+
+ StreamTaskState streamTaskState =
streamTaskStates[chainIdx];
+ if (streamTaskState == null) {
+ continue;
+ }
+
+ newChainStateList.set(chainIdx,
convertOperatorAndFunctionState(streamTaskState));
+
+ if (null == newKeyedState) {
+ newKeyedState =
convertKeyedBackendState(streamTaskState, parallelInstanceIdx);
+ }
+
+ }
+
+ ChainedStateHandle<StreamStateHandle> newChainedState = new
ChainedStateHandle<>(newChainStateList);
+ ChainedStateHandle<OperatorStateHandle> nopChain =
+ new ChainedStateHandle<>(Arrays.asList(new
OperatorStateHandle[newChainedState.getLength()]));
+
+ return new org.apache.flink.runtime.checkpoint.SubtaskState(
+ newChainedState,
--- End diff --
A suggestion for the tests: Add an assertion that converted SubtaskState
objects only ever have state set to the "legacy operator stat" and the "managed
keyed state".
> Implement savepoint backwards compatibility 1.1 -> 1.2
> ------------------------------------------------------
>
> Key: FLINK-5041
> URL: https://issues.apache.org/jira/browse/FLINK-5041
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> This issue tracks the implementation of backwards compatibility between Flink
> 1.1 and 1.2 releases.
> This task subsumes:
> - Converting old savepoints to new savepoints, including a conversion of
> state handles to their new replacement.
> - Converting keyed state from old backend implementations to their new
> counterparts.
> - Converting operator and function state for all changed operators.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)