http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java index d571dcc..91d7aab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java @@ -18,15 +18,10 @@ package org.apache.flink.runtime.state; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; -import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; -import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.io.VersionMismatchException; import org.apache.flink.core.io.VersionedIOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -34,23 +29,29 @@ import java.util.ArrayList; import java.util.List; /** - * Serialization proxy for all meta data in operator state backends. In the future we might also migrate the actual state + * Serialization proxy for all meta data in operator state backends. In the future we might also requiresMigration the actual state * serialization logic here. */ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable { - private static final int VERSION = 1; + public static final int VERSION = 2; - private List<StateMetaInfo<?>> namedStateSerializationProxies; + private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots; private ClassLoader userCodeClassLoader; + private int restoredVersion; + public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) { this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); } - public OperatorBackendSerializationProxy(List<StateMetaInfo<?>> namedStateSerializationProxies) { - this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies); - Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE); + public OperatorBackendSerializationProxy( + List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots) { + + this.stateMetaInfoSnapshots = Preconditions.checkNotNull(stateMetaInfoSnapshots); + Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE); + + this.restoredVersion = VERSION; } @Override @@ -59,129 +60,44 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab } @Override - public void write(DataOutputView out) throws IOException { - super.write(out); - - out.writeShort(namedStateSerializationProxies.size()); - - for (StateMetaInfo<?> kvState : namedStateSerializationProxies) { - kvState.write(out); - } + protected void resolveVersionRead(int foundVersion) throws VersionMismatchException { + super.resolveVersionRead(foundVersion); + this.restoredVersion = foundVersion; } @Override - public void read(DataInputView out) throws IOException { - super.read(out); - - int numKvStates = out.readShort(); - namedStateSerializationProxies = new ArrayList<>(numKvStates); - for (int i = 0; i < numKvStates; ++i) { - StateMetaInfo<?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader); - stateSerializationProxy.read(out); - namedStateSerializationProxies.add(stateSerializationProxy); - } + public boolean isCompatibleVersion(int version) { + // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x) + return super.isCompatibleVersion(version) || version == 1; } - public List<StateMetaInfo<?>> getNamedStateSerializationProxies() { - return namedStateSerializationProxies; - } - - //---------------------------------------------------------------------------------------------------------------------- - - public static class StateMetaInfo<S> implements IOReadableWritable { - - private String name; - private TypeSerializer<S> stateSerializer; - private OperatorStateHandle.Mode mode; - - private ClassLoader userClassLoader; - - @VisibleForTesting - public StateMetaInfo(ClassLoader userClassLoader) { - this.userClassLoader = Preconditions.checkNotNull(userClassLoader); - } - - public StateMetaInfo(String name, TypeSerializer<S> stateSerializer, OperatorStateHandle.Mode mode) { - this.name = Preconditions.checkNotNull(name); - this.stateSerializer = Preconditions.checkNotNull(stateSerializer); - this.mode = Preconditions.checkNotNull(mode); - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public TypeSerializer<S> getStateSerializer() { - return stateSerializer; - } - - public void setStateSerializer(TypeSerializer<S> stateSerializer) { - this.stateSerializer = stateSerializer; - } - - public OperatorStateHandle.Mode getMode() { - return mode; - } - - public void setMode(OperatorStateHandle.Mode mode) { - this.mode = mode; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeUTF(getName()); - out.writeByte(getMode().ordinal()); - DataOutputViewStream dos = new DataOutputViewStream(out); - InstantiationUtil.serializeObject(dos, getStateSerializer()); - } + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); - @Override - public void read(DataInputView in) throws IOException { - setName(in.readUTF()); - setMode(OperatorStateHandle.Mode.values()[in.readByte()]); - DataInputViewStream dis = new DataInputViewStream(in); - try { - TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userClassLoader); - setStateSerializer(stateSerializer); - } catch (ClassNotFoundException exception) { - throw new IOException(exception); - } + out.writeShort(stateMetaInfoSnapshots.size()); + for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> kvState : stateMetaInfoSnapshots) { + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getWriterForVersion(VERSION, kvState) + .writeStateMetaInfo(out); } + } - @Override - public boolean equals(Object o) { - - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - StateMetaInfo<?> metaInfo = (StateMetaInfo<?>) o; - - if (!getName().equals(metaInfo.getName())) { - return false; - } - - if (!getStateSerializer().equals(metaInfo.getStateSerializer())) { - return false; - } - - return getMode() == metaInfo.getMode(); + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + int numKvStates = in.readShort(); + stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + for (int i = 0; i < numKvStates; i++) { + stateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getReaderForVersion(restoredVersion, userCodeClassLoader) + .readStateMetaInfo(in)); } + } - @Override - public int hashCode() { - int result = getName().hashCode(); - result = 31 * result + getStateSerializer().hashCode(); - result = 31 * result + getMode().hashCode(); - return result; - } + public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> getStateMetaInfoSnapshots() { + return stateMetaInfoSnapshots; } }
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java new file mode 100644 index 0000000..9ab106b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Readers and writers for different versions of the {@link RegisteredOperatorBackendStateMetaInfo.Snapshot}. + * Outdated formats are also kept here for documentation of history backlog. + */ +public class OperatorBackendStateMetaInfoSnapshotReaderWriters { + + // ------------------------------------------------------------------------------- + // Writers + // - v1: Flink 1.2.x + // - v2: Flink 1.3.x + // ------------------------------------------------------------------------------- + + public static <S> OperatorBackendStateMetaInfoWriter getWriterForVersion( + int version, RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) { + + switch (version) { + case 1: + return new OperatorBackendStateMetaInfoWriterV1<>(stateMetaInfo); + + // current version + case OperatorBackendSerializationProxy.VERSION: + return new OperatorBackendStateMetaInfoWriterV2<>(stateMetaInfo); + + default: + // guard for future + throw new IllegalStateException( + "Unrecognized operator backend state meta info writer version: " + version); + } + } + + public interface OperatorBackendStateMetaInfoWriter { + void writeStateMetaInfo(DataOutputView out) throws IOException; + } + + public static abstract class AbstractOperatorBackendStateMetaInfoWriter<S> + implements OperatorBackendStateMetaInfoWriter { + + protected final RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo; + + public AbstractOperatorBackendStateMetaInfoWriter(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) { + this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); + } + } + + public static class OperatorBackendStateMetaInfoWriterV1<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> { + + public OperatorBackendStateMetaInfoWriterV1(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) { + super(stateMetaInfo); + } + + @Override + public void writeStateMetaInfo(DataOutputView out) throws IOException { + out.writeUTF(stateMetaInfo.getName()); + out.writeByte(stateMetaInfo.getAssignmentMode().ordinal()); + new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(out); + } + } + + public static class OperatorBackendStateMetaInfoWriterV2<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> { + + public OperatorBackendStateMetaInfoWriterV2(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) { + super(stateMetaInfo); + } + + @Override + public void writeStateMetaInfo(DataOutputView out) throws IOException { + out.writeUTF(stateMetaInfo.getName()); + out.writeByte(stateMetaInfo.getAssignmentMode().ordinal()); + + // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures + try ( + ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos(); + DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) { + + new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(outViewWrapper); + + // write the start offset of the config snapshot + out.writeInt(outWithPos.getPosition()); + TypeSerializerUtil.writeSerializerConfigSnapshot( + outViewWrapper, + stateMetaInfo.getPartitionStateSerializerConfigSnapshot()); + + // write the total number of bytes and flush + out.writeInt(outWithPos.getPosition()); + out.write(outWithPos.getBuf(), 0, outWithPos.getPosition()); + } + } + } + + // ------------------------------------------------------------------------------- + // Readers + // - v1: Flink 1.2.x + // - v2: Flink 1.3.x + // ------------------------------------------------------------------------------- + + public static <S> OperatorBackendStateMetaInfoReader<S> getReaderForVersion( + int version, ClassLoader userCodeClassLoader) { + + switch (version) { + case 1: + return new OperatorBackendStateMetaInfoReaderV1<>(userCodeClassLoader); + + // current version + case OperatorBackendSerializationProxy.VERSION: + return new OperatorBackendStateMetaInfoReaderV2<>(userCodeClassLoader); + + default: + // guard for future + throw new IllegalStateException( + "Unrecognized operator backend state meta info reader version: " + version); + } + } + + public interface OperatorBackendStateMetaInfoReader<S> { + RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException; + } + + public static abstract class AbstractOperatorBackendStateMetaInfoReader<S> + implements OperatorBackendStateMetaInfoReader<S> { + + protected final ClassLoader userCodeClassLoader; + + public AbstractOperatorBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + } + } + + public static class OperatorBackendStateMetaInfoReaderV1<S> extends AbstractOperatorBackendStateMetaInfoReader<S> { + + public OperatorBackendStateMetaInfoReaderV1(ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException { + RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo = + new RegisteredOperatorBackendStateMetaInfo.Snapshot<>(); + + stateMetaInfo.setName(in.readUTF()); + stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]); + DataInputViewStream dis = new DataInputViewStream(in); + try { + TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader); + stateMetaInfo.setPartitionStateSerializer(stateSerializer); + } catch (ClassNotFoundException exception) { + throw new IOException(exception); + } + + // old versions do not contain the partition state serializer's configuration snapshot + stateMetaInfo.setPartitionStateSerializerConfigSnapshot(null); + + return stateMetaInfo; + } + } + + public static class OperatorBackendStateMetaInfoReaderV2<S> extends AbstractOperatorBackendStateMetaInfoReader<S> { + + public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException { + RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo = + new RegisteredOperatorBackendStateMetaInfo.Snapshot<>(); + + stateMetaInfo.setName(in.readUTF()); + stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]); + + // read start offset of configuration snapshot + int configSnapshotStartOffset = in.readInt(); + + int totalBytes = in.readInt(); + + byte[] buffer = new byte[totalBytes]; + in.readFully(buffer); + + ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer); + DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos); + + try { + final TypeSerializerSerializationProxy<S> partitionStateSerializerProxy = + new TypeSerializerSerializationProxy<>(userCodeClassLoader); + partitionStateSerializerProxy.read(inViewWrapper); + stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer()); + } catch (IOException e) { + stateMetaInfo.setPartitionStateSerializer(null); + } + + // make sure we start from the partition state serializer bytes position + inWithPos.setPosition(configSnapshotStartOffset); + stateMetaInfo.setPartitionStateSerializerConfigSnapshot( + TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader)); + + return stateMetaInfo; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java deleted file mode 100644 index 0d4b3c8..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.migration.MigrationNamespaceSerializerProxy; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the - * state name. - * - * @param <N> Type of namespace - * @param <S> Type of state value - */ -public class RegisteredBackendStateMetaInfo<N, S> { - - private final StateDescriptor.Type stateType; - private final String name; - private final TypeSerializer<N> namespaceSerializer; - private final TypeSerializer<S> stateSerializer; - - public RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N, S> metaInfoProxy) { - this( - metaInfoProxy.getStateType(), - metaInfoProxy.getStateName(), - metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer(), - metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer()); - } - - public RegisteredBackendStateMetaInfo( - StateDescriptor.Type stateType, - String name, - TypeSerializer<N> namespaceSerializer, - TypeSerializer<S> stateSerializer) { - - this.stateType = checkNotNull(stateType); - this.name = checkNotNull(name); - this.namespaceSerializer = checkNotNull(namespaceSerializer); - this.stateSerializer = checkNotNull(stateSerializer); - } - - public StateDescriptor.Type getStateType() { - return stateType; - } - - public String getName() { - return name; - } - - public TypeSerializer<N> getNamespaceSerializer() { - return namespaceSerializer; - } - - public TypeSerializer<S> getStateSerializer() { - return stateSerializer; - } - - public boolean canRestoreFrom(RegisteredBackendStateMetaInfo<?, ?> other) { - - if (this == other) { - return true; - } - - if (null == other) { - return false; - } - - if (!stateType.equals(StateDescriptor.Type.UNKNOWN) - && !other.stateType.equals(StateDescriptor.Type.UNKNOWN) - && !stateType.equals(other.stateType)) { - return false; - } - - if (!name.equals(other.getName())) { - return false; - } - - return (stateSerializer.canRestoreFrom(other.stateSerializer)) && - (namespaceSerializer.canRestoreFrom(other.namespaceSerializer) - // we also check if there is just a migration proxy that should be replaced by any real serializer - || other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - RegisteredBackendStateMetaInfo<?, ?> that = (RegisteredBackendStateMetaInfo<?, ?>) o; - - if (!stateType.equals(that.stateType)) { - return false; - } - - if (!getName().equals(that.getName())) { - return false; - } - - return getStateSerializer().equals(that.getStateSerializer()) - && getNamespaceSerializer().equals(that.getNamespaceSerializer()); - } - - @Override - public String toString() { - return "RegisteredBackendStateMetaInfo{" + - "stateType=" + stateType + - ", name='" + name + '\'' + - ", namespaceSerializer=" + namespaceSerializer + - ", stateSerializer=" + stateSerializer + - '}'; - } - - @Override - public int hashCode() { - int result = getName().hashCode(); - result = 31 * result + getStateType().hashCode(); - result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0); - result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java new file mode 100644 index 0000000..e1a7e06 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the + * state name. + * + * @param <N> Type of namespace + * @param <S> Type of state value + */ +public class RegisteredKeyedBackendStateMetaInfo<N, S> { + + private final StateDescriptor.Type stateType; + private final String name; + private final TypeSerializer<N> namespaceSerializer; + private final TypeSerializer<S> stateSerializer; + + public RegisteredKeyedBackendStateMetaInfo( + StateDescriptor.Type stateType, + String name, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<S> stateSerializer) { + + this.stateType = checkNotNull(stateType); + this.name = checkNotNull(name); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.stateSerializer = checkNotNull(stateSerializer); + } + + public StateDescriptor.Type getStateType() { + return stateType; + } + + public String getName() { + return name; + } + + public TypeSerializer<N> getNamespaceSerializer() { + return namespaceSerializer; + } + + public TypeSerializer<S> getStateSerializer() { + return stateSerializer; + } + + public Snapshot<N, S> snapshot() { + return new Snapshot<>( + stateType, + name, + namespaceSerializer.duplicate(), + stateSerializer.duplicate(), + namespaceSerializer.snapshotConfiguration(), + stateSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + RegisteredKeyedBackendStateMetaInfo<?, ?> that = (RegisteredKeyedBackendStateMetaInfo<?, ?>) o; + + if (!stateType.equals(that.stateType)) { + return false; + } + + if (!getName().equals(that.getName())) { + return false; + } + + return getStateSerializer().equals(that.getStateSerializer()) + && getNamespaceSerializer().equals(that.getNamespaceSerializer()); + } + + @Override + public String toString() { + return "RegisteredKeyedBackendStateMetaInfo{" + + "stateType=" + stateType + + ", name='" + name + '\'' + + ", namespaceSerializer=" + namespaceSerializer + + ", stateSerializer=" + stateSerializer + + '}'; + } + + @Override + public int hashCode() { + int result = getName().hashCode(); + result = 31 * result + getStateType().hashCode(); + result = 31 * result + getNamespaceSerializer().hashCode(); + result = 31 * result + getStateSerializer().hashCode(); + return result; + } + + /** + * A consistent snapshot of a {@link RegisteredKeyedBackendStateMetaInfo}. + */ + public static class Snapshot<N, S> { + + private StateDescriptor.Type stateType; + private String name; + private TypeSerializer<N> namespaceSerializer; + private TypeSerializer<S> stateSerializer; + private TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot; + private TypeSerializerConfigSnapshot stateSerializerConfigSnapshot; + + /** Empty constructor used when restoring the state meta info snapshot. */ + Snapshot() {} + + private Snapshot( + StateDescriptor.Type stateType, + String name, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<S> stateSerializer, + TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot, + TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) { + + this.stateType = Preconditions.checkNotNull(stateType); + this.name = Preconditions.checkNotNull(name); + this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer); + this.stateSerializer = Preconditions.checkNotNull(stateSerializer); + this.namespaceSerializerConfigSnapshot = Preconditions.checkNotNull(namespaceSerializerConfigSnapshot); + this.stateSerializerConfigSnapshot = Preconditions.checkNotNull(stateSerializerConfigSnapshot); + } + + public StateDescriptor.Type getStateType() { + return stateType; + } + + void setStateType(StateDescriptor.Type stateType) { + this.stateType = stateType; + } + + public String getName() { + return name; + } + + void setName(String name) { + this.name = name; + } + + public TypeSerializer<N> getNamespaceSerializer() { + return namespaceSerializer; + } + + void setNamespaceSerializer(TypeSerializer<N> namespaceSerializer) { + this.namespaceSerializer = namespaceSerializer; + } + + public TypeSerializer<S> getStateSerializer() { + return stateSerializer; + } + + void setStateSerializer(TypeSerializer<S> stateSerializer) { + this.stateSerializer = stateSerializer; + } + + public TypeSerializerConfigSnapshot getNamespaceSerializerConfigSnapshot() { + return namespaceSerializerConfigSnapshot; + } + + void setNamespaceSerializerConfigSnapshot(TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot) { + this.namespaceSerializerConfigSnapshot = namespaceSerializerConfigSnapshot; + } + + public TypeSerializerConfigSnapshot getStateSerializerConfigSnapshot() { + return stateSerializerConfigSnapshot; + } + + void setStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) { + this.stateSerializerConfigSnapshot = stateSerializerConfigSnapshot; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + Snapshot<?, ?> that = (Snapshot<?, ?>) o; + + if (!stateType.equals(that.stateType)) { + return false; + } + + if (!getName().equals(that.getName())) { + return false; + } + + // need to check for nulls because serializer and config snapshots may be null on restore + return + ((getStateSerializer() == null && that.getStateSerializer() == null) + || getStateSerializer().equals(that.getStateSerializer())) + && ((getNamespaceSerializer() == null && that.getNamespaceSerializer() == null) + || getNamespaceSerializer().equals(that.getNamespaceSerializer())) + && ((getNamespaceSerializerConfigSnapshot() == null && that.getNamespaceSerializerConfigSnapshot() == null) + || getNamespaceSerializerConfigSnapshot().equals(that.getNamespaceSerializerConfigSnapshot())) + && ((getStateSerializerConfigSnapshot() == null && that.getStateSerializerConfigSnapshot() == null) + || getStateSerializerConfigSnapshot().equals(that.getStateSerializerConfigSnapshot())); + } + + @Override + public int hashCode() { + // need to check for nulls because serializer and config snapshots may be null on restore + int result = getName().hashCode(); + result = 31 * result + getStateType().hashCode(); + result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0); + result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0); + result = 31 * result + (getNamespaceSerializerConfigSnapshot() != null ? getNamespaceSerializerConfigSnapshot().hashCode() : 0); + result = 31 * result + (getStateSerializerConfigSnapshot() != null ? getStateSerializerConfigSnapshot().hashCode() : 0); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java new file mode 100644 index 0000000..b43fc9c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +/** + * Compound meta information for a registered state in an operator state backend. + * This contains the state name, assignment mode, and state partition serializer. + * + * @param <S> Type of the state. + */ +public class RegisteredOperatorBackendStateMetaInfo<S> { + + /** + * The name of the state, as registered by the user + */ + private final String name; + + /** + * The mode how elements in this state are assigned to tasks during restore + */ + private final OperatorStateHandle.Mode assignmentMode; + + /** + * The type serializer for the elements in the state list + */ + private final TypeSerializer<S> partitionStateSerializer; + + public RegisteredOperatorBackendStateMetaInfo( + String name, + TypeSerializer<S> partitionStateSerializer, + OperatorStateHandle.Mode assignmentMode) { + + this.name = Preconditions.checkNotNull(name); + this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer); + this.assignmentMode = Preconditions.checkNotNull(assignmentMode); + } + + public String getName() { + return name; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public TypeSerializer<S> getPartitionStateSerializer() { + return partitionStateSerializer; + } + + public Snapshot<S> snapshot() { + return new Snapshot<>( + name, + assignmentMode, + partitionStateSerializer.duplicate(), + partitionStateSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + return (obj instanceof RegisteredOperatorBackendStateMetaInfo) + && name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName()) + && assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getAssignmentMode()) + && partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getPartitionStateSerializer()); + } + + @Override + public int hashCode() { + int result = getName().hashCode(); + result = 31 * result + getAssignmentMode().hashCode(); + result = 31 * result + getPartitionStateSerializer().hashCode(); + return result; + } + + @Override + public String toString() { + return "RegisteredOperatorBackendStateMetaInfo{" + + "name='" + name + "\'" + + ", assignmentMode=" + assignmentMode + + ", partitionStateSerializer=" + partitionStateSerializer + + '}'; + } + + /** + * A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}. + */ + public static class Snapshot<S> { + + private String name; + private OperatorStateHandle.Mode assignmentMode; + private TypeSerializer<S> partitionStateSerializer; + private TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot; + + /** Empty constructor used when restoring the state meta info snapshot. */ + Snapshot() {} + + private Snapshot( + String name, + OperatorStateHandle.Mode assignmentMode, + TypeSerializer<S> partitionStateSerializer, + TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) { + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = Preconditions.checkNotNull(assignmentMode); + this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer); + this.partitionStateSerializerConfigSnapshot = Preconditions.checkNotNull(partitionStateSerializerConfigSnapshot); + } + + public String getName() { + return name; + } + + void setName(String name) { + this.name = name; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + void setAssignmentMode(OperatorStateHandle.Mode assignmentMode) { + this.assignmentMode = assignmentMode; + } + + public TypeSerializer<S> getPartitionStateSerializer() { + return partitionStateSerializer; + } + + void setPartitionStateSerializer(TypeSerializer<S> partitionStateSerializer) { + this.partitionStateSerializer = partitionStateSerializer; + } + + public TypeSerializerConfigSnapshot getPartitionStateSerializerConfigSnapshot() { + return partitionStateSerializerConfigSnapshot; + } + + void setPartitionStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) { + this.partitionStateSerializerConfigSnapshot = partitionStateSerializerConfigSnapshot; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + // need to check for nulls because serializer and config snapshots may be null on restore + return (obj instanceof Snapshot) + && name.equals(((Snapshot) obj).getName()) + && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) + && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) + || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) + && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) + || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + } + + @Override + public int hashCode() { + // need to check for nulls because serializer and config snapshots may be null on restore + int result = getName().hashCode(); + result = 31 * result + getAssignmentMode().hashCode(); + result = 31 * result + (getPartitionStateSerializer() != null ? getPartitionStateSerializer().hashCode() : 0); + result = 31 * result + (getPartitionStateSerializerConfigSnapshot() != null ? getPartitionStateSerializerConfigSnapshot().hashCode() : 0); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java new file mode 100644 index 0000000..978f28d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; + +/** + * Utilities related to state migration, commonly used in the state backends. + */ +public class StateMigrationUtil { + + /** + * Resolves the final compatibility result of two serializers by taking into account compound information, + * including the preceding serializer, the preceding serializer's configuration snapshot, and the new serializer. + * + * The final result is determined as follows: + * 1. If there is no configuration snapshot of the preceding serializer, + * assumes the new serializer to be compatible. + * 2. Confront the configuration snapshot with the new serializer. + * 3. If the result is compatible, just return that as the result. + * 4. If not compatible and requires migration, check if the preceding serializer is valid. + * If yes, use that as the convert deserializer for state migration. + * 5. If the preceding serializer is not valid, check if the result came with a convert deserializer. + * If yes, use that for state migration and simply return the result. + * 6. If all of above fails, state migration is required but could not be performed; throw exception. + * + * @param precedingSerializer the preceding serializer used to write the data + * @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder + * @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer + * @param newSerializer the new serializer to ensure compatibility with + * + * @param <T> Type of the data handled by the serializers + * + * @return the final resolved compatiblity result + */ + public static <T> CompatibilityResult<T> resolveCompatibilityResult( + TypeSerializer<T> precedingSerializer, + Class<?> dummySerializerClassTag, + TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot, + TypeSerializer<T> newSerializer) { + + if (precedingSerializerConfigSnapshot != null) { + CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot); + + if (!initialResult.requiresMigration()) { + return initialResult; + } else { + if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) { + // if the preceding serializer exists and is not a dummy, use + // that for converting instead of the provided convert deserializer + return CompatibilityResult.requiresMigration(precedingSerializer); + } else if (initialResult.getConvertDeserializer() != null) { + return initialResult; + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous data."); + } + } + } else { + // if the configuration snapshot of the preceding serializer cannot be provided, + // we can only simply assume that the new serializer is compatible + return CompatibilityResult.compatible(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java index 8b58891..2800899 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.migration.MigrationNamespaceSerializerProxy; import java.io.IOException; @@ -89,4 +90,11 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN public boolean canEqual(Object obj) { return obj instanceof VoidNamespaceSerializer; } + + @Override + protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { + // we might be replacing a migration namespace serializer, in which case we just assume compatibility + return super.isCompatibleSerializationFormatIdentifier(identifier) + || identifier.equals(MigrationNamespaceSerializerProxy.class.getCanonicalName()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java index d63b6d3..7b61da1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; @@ -196,7 +196,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen * @param keyContext the key context. * @param metaInfo the meta information, including the type serializer for state copy-on-write. */ - CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) { + CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) { this(keyContext, metaInfo, 1024); } @@ -209,7 +209,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen * @throws IllegalArgumentException when the capacity is less than zero. */ @SuppressWarnings("unchecked") - private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) { + private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo, int capacity) { super(keyContext, metaInfo); // initialized tables to EMPTY_TABLE. @@ -532,12 +532,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen } @Override - public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() { + public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() { return metaInfo; } @Override - public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) { + public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) { this.metaInfo = metaInfo; } @@ -1063,4 +1063,4 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen throw new UnsupportedOperationException("Read-only iterator"); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index aecc72e..866ed28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; @@ -132,8 +132,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) { - final RegisteredBackendStateMetaInfo<N, V> newMetaInfo = - new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer); + final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer); @SuppressWarnings("unchecked") StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName); @@ -142,12 +142,27 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateTable = newStateTable(newMetaInfo); stateTables.put(stateName, stateTable); } else { - if (!newMetaInfo.canRestoreFrom(stateTable.getMetaInfo())) { - throw new RuntimeException("Trying to access state using incompatible meta info, was " + - stateTable.getMetaInfo() + " trying access with " + newMetaInfo); + // TODO with eager registration in place, these checks should be moved to restorePartitionedState() + + Preconditions.checkState( + stateName.equals(stateTable.getMetaInfo().getName()), + "Incompatible state names. " + + "Was [" + stateTable.getMetaInfo().getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), + "Incompatible state types. " + + "Was [" + stateTable.getMetaInfo().getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); } + stateTable.setMetaInfo(newMetaInfo); } + return stateTable; } @@ -240,21 +255,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { "Too many KV-States: " + stateTables.size() + ". Currently at most " + Short.MAX_VALUE + " states are supported"); - List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size()); + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = new ArrayList<>(stateTables.size()); final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size()); final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size()); for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { - RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo(); - KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo( - metaInfo.getStateType(), - metaInfo.getName(), - metaInfo.getNamespaceSerializer(), - metaInfo.getStateSerializer()); - - metaInfoProxyList.add(metaInfoProxy); + metaInfoSnapshots.add(kvState.getValue().getMetaInfo().snapshot()); kVStateToId.put(kvState.getKey(), kVStateToId.size()); StateTable<K, ?, ?> stateTable = kvState.getValue(); if (null != stateTable) { @@ -263,7 +271,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } final KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList); + new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots); //--------------------------------------------------- this becomes the end of sync part @@ -376,23 +384,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.read(inView); - List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList = - serializationProxy.getNamedStateSerializationProxies(); + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = + serializationProxy.getStateMetaInfoSnapshots(); - for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) { + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { - StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName()); + StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName()); //important: only create a new table we did not already create it previously if (null == stateTable) { - RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo = - new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy); + RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + restoredMetaInfo.getStateType(), + restoredMetaInfo.getName(), + restoredMetaInfo.getNamespaceSerializer(), + restoredMetaInfo.getStateSerializer()); - stateTable = newStateTable(registeredBackendStateMetaInfo); - stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable); - kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName()); + stateTable = newStateTable(registeredKeyedBackendStateMetaInfo); + stateTables.put(restoredMetaInfo.getName(), stateTable); + kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName()); ++numRegisteredKvStates; + } else { + // TODO with eager state registration in place, check here for serializer migration strategies } } @@ -410,7 +424,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex, "Unexpected key-group in restore."); - for (int i = 0; i < metaInfoList.size(); i++) { + for (int i = 0; i < restoredMetaInfos.size(); i++) { int kvStateId = inView.readShort(); StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId)); @@ -509,7 +523,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return sum; } - public <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) { + public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) { return asynchronousSnapshots ? new CopyOnWriteStateTable<>(this, newMetaInfo) : new NestedMapsStateTable<>(this, newMetaInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java index 22f344d..75c31db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java @@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.Preconditions; @@ -63,7 +63,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> { * @param keyContext the key context. * @param metaInfo the meta information for this state table. */ - public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) { + public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) { super(keyContext, metaInfo); this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup(); http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java index 62fc869..c1cdcc3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.Preconditions; @@ -42,14 +42,14 @@ public abstract class StateTable<K, N, S> { /** * Combined meta information such as name and serializers for this state */ - protected RegisteredBackendStateMetaInfo<N, S> metaInfo; + protected RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo; /** * * @param keyContext the key context provides the key scope for all put/get/delete operations. * @param metaInfo the meta information, including the type serializer for state copy-on-write. */ - public StateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) { + public StateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) { this.keyContext = Preconditions.checkNotNull(keyContext); this.metaInfo = Preconditions.checkNotNull(metaInfo); } @@ -168,11 +168,11 @@ public abstract class StateTable<K, N, S> { return metaInfo.getNamespaceSerializer(); } - public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() { + public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() { return metaInfo; } - public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) { + public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) { this.metaInfo = metaInfo; } @@ -186,4 +186,4 @@ public abstract class StateTable<K, N, S> { @VisibleForTesting public abstract int sizeOfNamespace(Object namespace); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java index 53ec349..d7bc94e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java @@ -47,7 +47,8 @@ class StateTableByKeyGroupReaders { case 1: return new StateTableByKeyGroupReaderV1<>(table); case 2: - return new StateTableByKeyGroupReaderV2<>(table); + case 3: + return new StateTableByKeyGroupReaderV2V3<>(table); default: throw new IllegalArgumentException("Unknown version: " + version); } @@ -110,10 +111,10 @@ class StateTableByKeyGroupReaders { } } - private static final class StateTableByKeyGroupReaderV2<K, N, S> + private static final class StateTableByKeyGroupReaderV2V3<K, N, S> extends AbstractStateTableByKeyGroupReader<K, N, S> { - StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) { + StateTableByKeyGroupReaderV2V3(StateTable<K, N, S> stateTable) { super(stateTable); } @@ -133,4 +134,4 @@ class StateTableByKeyGroupReaders { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java index e872526..4bdc5e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java @@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types; import java.io.IOException; import java.util.Arrays; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -127,4 +129,14 @@ public class IntListSerializer extends TypeSerializer<IntList> { public int hashCode() { return IntListSerializer.class.hashCode(); } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public CompatibilityResult<IntList> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java index e4a9264..0ae5e71 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java @@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types; import java.io.IOException; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; @@ -136,4 +138,14 @@ public class IntPairSerializer extends TypeSerializer<IntPair> { return obj.getClass() == IntPairSerializerFactory.class; }; } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public CompatibilityResult<IntPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java index b62b097..17ee5f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.operators.testutils.types; import java.io.IOException; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.StringValue; @@ -104,4 +106,14 @@ public class StringPairSerializer extends TypeSerializer<StringPair> { public int hashCode() { return StringPairSerializer.class.hashCode(); } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public CompatibilityResult<StringPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java index b4d6eb7..8c4e049 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.query.netty.UnknownKvStateID; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.heap.HeapValueState; @@ -270,7 +270,7 @@ public class QueryableStateClientTest { ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - RegisteredBackendStateMetaInfo<VoidNamespace, Integer> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>( + RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( descriptor.getType(), descriptor.getName(), VoidNamespaceSerializer.INSTANCE, @@ -279,7 +279,7 @@ public class QueryableStateClientTest { // Register state HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>( descriptor, - new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredBackendStateMetaInfo), + new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredKeyedBackendStateMetaInfo), IntSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE); http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index c04ed8c..50ca159 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -97,7 +97,7 @@ public class OperatorStateBackendTest { assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); // make sure that type registrations are forwarded - TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getPartitionStateSerializer(); + TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getStateMetaInfo().getPartitionStateSerializer(); assertTrue(serializer instanceof KryoSerializer); assertTrue(((KryoSerializer<?>) serializer).getKryo().getSerializer(registeredType) instanceof com.esotericsoftware.kryo.serializers.JavaSerializer); http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java index 0dbe2eb..02b4d62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.api.common.typeutils.base.DoubleSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -29,10 +30,21 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) public class SerializationProxiesTest { @Test @@ -42,14 +54,14 @@ public class SerializationProxiesTest { TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE; TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; - List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoList = new ArrayList<>(); + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoList = new ArrayList<>(); - stateMetaInfoList.add( - new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer)); - stateMetaInfoList.add( - new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer)); - stateMetaInfoList.add( - new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer)); + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot()); + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot()); + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot()); KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList); @@ -67,8 +79,8 @@ public class SerializationProxiesTest { serializationProxy.read(new DataInputViewStreamWrapper(in)); } - Assert.assertEquals(keySerializer, serializationProxy.getKeySerializerProxy().getTypeSerializer()); - Assert.assertEquals(stateMetaInfoList, serializationProxy.getNamedStateSerializationProxies()); + Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer()); + Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots()); } @Test @@ -78,41 +90,79 @@ public class SerializationProxiesTest { TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE; TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; - KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfo = - new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer); + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot(); byte[] serialized; try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { - metaInfo.write(new DataOutputViewStreamWrapper(out)); + KeyedBackendStateMetaInfoSnapshotReaderWriters + .getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo) + .writeStateMetaInfo(new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); } - metaInfo = new KeyedBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader()); - try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { - metaInfo.read(new DataInputViewStreamWrapper(in)); + metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters + .getReaderForVersion(KeyedBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader()) + .readStateMetaInfo(new DataInputViewStreamWrapper(in)); } - Assert.assertEquals(name, metaInfo.getStateName()); + Assert.assertEquals(name, metaInfo.getName()); } + @Test + public void testKeyedStateMetaInfoReadSerializerFailureResilience() throws Exception { + String name = "test"; + TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; + + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot(); + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + KeyedBackendStateMetaInfoSnapshotReaderWriters + .getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo) + .writeStateMetaInfo(new DataOutputViewStreamWrapper(out)); + + serialized = out.toByteArray(); + } + + // mock failure when deserializing serializer + TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters + .getReaderForVersion(KeyedBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader()) + .readStateMetaInfo(new DataInputViewStreamWrapper(in)); + } + + Assert.assertEquals(name, metaInfo.getName()); + Assert.assertEquals(null, metaInfo.getNamespaceSerializer()); + Assert.assertEquals(null, metaInfo.getStateSerializer()); + Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), metaInfo.getNamespaceSerializerConfigSnapshot()); + Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getStateSerializerConfigSnapshot()); + } @Test public void testOperatorBackendSerializationProxyRoundtrip() throws Exception { TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; - List<OperatorBackendSerializationProxy.StateMetaInfo<?>> stateMetaInfoList = new ArrayList<>(); + List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots = new ArrayList<>(); - stateMetaInfoList.add( - new OperatorBackendSerializationProxy.StateMetaInfo<>("a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); - stateMetaInfoList.add( - new OperatorBackendSerializationProxy.StateMetaInfo<>("b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); - stateMetaInfoList.add( - new OperatorBackendSerializationProxy.StateMetaInfo<>("c", stateSerializer, OperatorStateHandle.Mode.BROADCAST)); + stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>( + "a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot()); + stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>( + "b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot()); + stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>( + "c", stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot()); OperatorBackendSerializationProxy serializationProxy = - new OperatorBackendSerializationProxy(stateMetaInfoList); + new OperatorBackendSerializationProxy(stateMetaInfoSnapshots); byte[] serialized; try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { @@ -127,7 +177,7 @@ public class SerializationProxiesTest { serializationProxy.read(new DataInputViewStreamWrapper(in)); } - Assert.assertEquals(stateMetaInfoList, serializationProxy.getNamedStateSerializationProxies()); + Assert.assertEquals(stateMetaInfoSnapshots, serializationProxy.getStateMetaInfoSnapshots()); } @Test @@ -136,22 +186,60 @@ public class SerializationProxiesTest { String name = "test"; TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; - OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = - new OperatorBackendSerializationProxy.StateMetaInfo<>(name, stateSerializer, OperatorStateHandle.Mode.BROADCAST); + RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo = + new RegisteredOperatorBackendStateMetaInfo<>( + name, stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot(); + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo) + .writeStateMetaInfo(new DataOutputViewStreamWrapper(out)); + + serialized = out.toByteArray(); + } + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters + .getReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader()) + .readStateMetaInfo(new DataInputViewStreamWrapper(in)); + } + + Assert.assertEquals(name, metaInfo.getName()); + } + + @Test + public void testOperatorStateMetaInfoReadSerializerFailureResilience() throws Exception { + String name = "test"; + TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; + + RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo = + new RegisteredOperatorBackendStateMetaInfo<>( + name, stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot(); byte[] serialized; try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { - metaInfo.write(new DataOutputViewStreamWrapper(out)); + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo) + .writeStateMetaInfo(new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); } - metaInfo = new OperatorBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader()); + // mock failure when deserializing serializer + TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { - metaInfo.read(new DataInputViewStreamWrapper(in)); + metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters + .getReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader()) + .readStateMetaInfo(new DataInputViewStreamWrapper(in)); } Assert.assertEquals(name, metaInfo.getName()); + Assert.assertEquals(null, metaInfo.getPartitionStateSerializer()); + Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getPartitionStateSerializerConfigSnapshot()); } /** @@ -171,4 +259,4 @@ public class SerializationProxiesTest { Assert.assertEquals(5, StateDescriptor.Type.AGGREGATING.ordinal()); Assert.assertEquals(6, StateDescriptor.Type.MAP.ordinal()); } -} \ No newline at end of file +}
