[
https://issues.apache.org/jira/browse/FLINK-6482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019409#comment-16019409
]
ASF GitHub Bot commented on FLINK-6482:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3937#discussion_r117710523
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
---
@@ -19,47 +19,65 @@
package org.apache.flink.api.common.typeutils;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
/**
* A {@link TypeSerializerConfigSnapshot} for serializers that has
multiple nested serializers.
- * The configuration snapshot consists of the configuration snapshots of
all nested serializers.
+ * The configuration snapshot consists of the configuration snapshots of
all nested serializers, and
+ * also the nested serializers themselves.
+ *
+ * <p>Both the nested serializers and the configuration snapshots are
written as configuration of
+ * composite serializers, so that on restore, the previous serializer may
be used in case migration
+ * is required.
*/
@Internal
public abstract class CompositeTypeSerializerConfigSnapshot extends
TypeSerializerConfigSnapshot {
- private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
+ private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
nestedSerializersWithConfigs;
/** This empty nullary constructor is required for deserializing the
configuration. */
public CompositeTypeSerializerConfigSnapshot() {}
- public
CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot...
nestedSerializerConfigSnapshots) {
- this.nestedSerializerConfigSnapshots =
Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
+ public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>...
nestedSerializers) {
+ Preconditions.checkNotNull(nestedSerializers);
+
+ this.nestedSerializersWithConfigs = new
ArrayList<>(nestedSerializers.length);
+ TypeSerializerConfigSnapshot configSnapshot;
+ for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
+ configSnapshot =
nestedSerializer.snapshotConfiguration();
+ this.nestedSerializersWithConfigs.add(
+ new Tuple2<TypeSerializer<?>,
TypeSerializerConfigSnapshot>(
+ nestedSerializer.duplicate(),
+
Preconditions.checkNotNull(configSnapshot)));
+ }
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
- TypeSerializerUtil.writeSerializerConfigSnapshots(out,
nestedSerializerConfigSnapshots);
+
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out,
nestedSerializersWithConfigs);
}
@Override
public void read(DataInputView in) throws IOException {
super.read(in);
- nestedSerializerConfigSnapshots =
TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
+ this.nestedSerializersWithConfigs =
+
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in,
getUserCodeClassLoader());
}
- public TypeSerializerConfigSnapshot[]
getNestedSerializerConfigSnapshots() {
- return nestedSerializerConfigSnapshots;
+ public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>
getNestedSerializersAndConfigs() {
+ return nestedSerializersWithConfigs;
}
- public TypeSerializerConfigSnapshot
getSingleNestedSerializerConfigSnapshot() {
- return nestedSerializerConfigSnapshots[0];
+ public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>
getSingleNestedSerializerAndConfig() {
+ return nestedSerializersWithConfigs.get(0);
--- End diff --
Method name and variable name are not aligned.
> Add nested serializers into configuration snapshots of composite serializers
> ----------------------------------------------------------------------------
>
> Key: FLINK-6482
> URL: https://issues.apache.org/jira/browse/FLINK-6482
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing, Type Serialization System
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.0
>
>
> Currently, the composite serializers' configuration snapshots only wrap the
> config snapshots of nested serializers.
> We should also consider adding serialization of the nested serializers into
> the config snapshot, so that in the case where only some nested serializer
> cannot be loaded (class missing / implementation changed), we can also
> provide a path for serializer upgrades.
> This applies for all composite serializers that have nested serializers.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)