[
https://issues.apache.org/jira/browse/FLINK-9799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541657#comment-16541657
]
ASF GitHub Bot commented on FLINK-9799:
---------------------------------------
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r202033136
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Static factory that gives out the write and readers for different
versions of {@link StateMetaInfoSnapshot}.
+ */
+public class StateMetaInfoSnapshotReadersWriters {
+
+ /**
+ * Current version for the serialization format of {@link
StateMetaInfoSnapshotReadersWriters}.
+ * - v5: Flink 1.6.x
+ */
+ public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
+
+ /**
+ * Enum for backeards compatibility. This gives a hint about the
expected state type for which a
+ * {@link StateMetaInfoSnapshot} should be deserialized.
+ *
+ * TODO this can go away after we eventually drop backwards
compatibility with all versions < 5.
+ */
+ public enum StateTypeHint {
+ KEYED_STATE,
+ OPERATOR_STATE
+ }
+
+ /**
+ * Returns the writer for {@link StateMetaInfoSnapshot}.
+ */
+ @Nonnull
+ public static StateMetaInfoWriter getWriter() {
+ return CurrentWriterImpl.INSTANCE;
+ }
+
+ /**
+ * Returns a reader for {@link StateMetaInfoSnapshot} with the
requested state type and version number.
+ *
+ * @param readVersion the format version to read.
+ * @param stateTypeHint a hint about the expected type to read.
+ * @return the requested reader.
+ */
+ @Nonnull
+ public static StateMetaInfoReader getReader(int readVersion, @Nonnull
StateTypeHint stateTypeHint) {
+
+ if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ // latest version shortcut
+ return CurrentReaderImpl.INSTANCE;
+ }
+
+ if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ throw new IllegalArgumentException("Unsupported read
version for state meta info: " + readVersion);
+ }
+
+ switch (stateTypeHint) {
--- End diff --
One small suggestion here is to move all the legacy stuff into separate
package/class and leave here only call to legacy reader factory. That would
unload this class a bit and simplify cleaning of legacy stuff.
> Generalize/unify state meta info
> --------------------------------
>
> Key: FLINK-9799
> URL: https://issues.apache.org/jira/browse/FLINK-9799
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
>
> Flink currently has a couple of classes that describe the meta data of state
> (e.g. for keyed state, operator state, broadcast state, ...) and they
> typically come with their own serialization proxy and backwards compatibility
> story. However, the differences between those meta data classes are very
> small, like different option flags and a different set of serializers. Before
> introducing yet another meta data for timers, we should unify them in a
> general state meta data class.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)