[ https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349905#comment-16349905 ]
ASF GitHub Bot commented on FLINK-8421: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5362#discussion_r165573730 --- Diff: flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.InputStreamViewWrapper; + +import java.io.IOException; +import java.io.PushbackInputStream; +import java.util.Arrays; + +/** + * A {@link VersionedIOReadableWritable} which allows to differentiate whether the previous + * data was versioned with a {@link VersionedIOReadableWritable}. This can be used if previously + * written data was not versioned, and is to be migrated to a versioned format. + */ +@Internal +public abstract class PostVersionedIOReadableWritable extends VersionedIOReadableWritable { + + /** NOTE: CANNOT CHANGE! */ + private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, -51, -123, -97}; + + /** + * Read from the provided {@link DataInputView in}. A flag {@code wasVersioned} can be + * used to determine whether or not the data to read was previously written + * by a {@link VersionedIOReadableWritable}. + */ + protected abstract void read(DataInputView in, boolean wasVersioned) throws IOException; + + @Override + public void write(DataOutputView out) throws IOException { + out.write(VERSIONED_IDENTIFIER); + super.write(out); + } + + /** + * This read attempts to first identify if the input view contains the special + * {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes. + * If identified to be versioned, the usual version resolution read path + * in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked. + * Otherwise, we "reset" the input view by pushing back the read buffered bytes + * into the stream. + */ + @Override + public final void read(DataInputView in) throws IOException { + PushbackInputStream stream = new PushbackInputStream(new InputStreamViewWrapper(in), VERSIONED_IDENTIFIER.length); + + byte[] tmp = new byte[VERSIONED_IDENTIFIER.length]; + stream.read(tmp); + + if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) { + super.read(in); + read(in, true); + } else { + stream.unread(tmp); + read(new DataInputViewStreamWrapper(stream), false); + } --- End diff -- Not-so-nice things about this current implementation is: 1) it requires several layers of transforming back and forth between a `DataInputView` and `InputStream`, and 2) it uses a separate `read(DataInputView, boolean)` method in order to wrap a "reset" `DataInputView` for the remaining reads. I think the implementation would have been much more elegant if `DataInputView` has an `unread(byte[])` method, though I'm not sure how non-trivial it is to support this across all subclasses. Maybe a food for thought for the future .. > HeapInternalTimerService should reconfigure compatible key / namespace > serializers on restore > --------------------------------------------------------------------------------------------- > > Key: FLINK-8421 > URL: https://issues.apache.org/jira/browse/FLINK-8421 > Project: Flink > Issue Type: Bug > Affects Versions: 1.4.0, 1.5.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > The {{HeapInternalTimerService}} still uses simple {{equals}} checks on > restored / newly provided serializers for compatibility checks. This should > be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, > so that new serializers can be reconfigured. > This would entail that the {{TypeSerializerConfiguration}} of the key and > namespace serializer in the {{HeapInternalTimerService}} also needs to be > written to the raw state. > For Flink 1.4.0 release and current master, this is a critical bug since the > {{KryoSerializer}} has different default base registrations than before due > to FLINK-7420. i.e if the key of a window is serialized using the > {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0. > For 1.3.x, this fix would be an improvement, such that the > {{HeapInternalTimerService}} restore will make use of serializer > reconfiguration. > Other remarks: > * We need to double check all operators that checkpoint / restore from > **raw** state. Apparently, the serializer compatibility checks were only > implemented for managed state. > * Migration ITCases apparently do not have enough coverage. A migration test > job that uses a key type which required the {{KryoSerializer}}, and uses > windows, would have caught this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)