dawidwys commented on code in PR #21201:
URL: https://github.com/apache/flink/pull/21201#discussion_r1012785767
##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java:
##########
@@ -36,8 +32,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InvalidClassException;
-import java.util.ArrayList;
-import java.util.List;
Review Comment:
+1, I believe we should remove the class, it is used in tests only
##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtils.java:
##########
@@ -23,46 +23,18 @@
/** Utilities for dealing with the {@link TypeSerializer} and the {@link
TypeSerializerSnapshot}. */
public final class TypeSerializerUtils {
- /**
- * Takes snapshots of the given serializers. In case where the snapshots
are still extending the
- * old {@code TypeSerializerConfigSnapshot} class, the snapshots are set
up properly (with their
- * originating serializer) such that the backwards compatible code paths
work.
- */
- public static TypeSerializerSnapshot<?>[] snapshotBackwardsCompatible(
+ /** Takes snapshots of the given serializers. */
+ public static TypeSerializerSnapshot<?>[] snapshot(
TypeSerializer<?>... originatingSerializers) {
return Arrays.stream(originatingSerializers)
- .map(TypeSerializerUtils::snapshotBackwardsCompatible)
+ .map(TypeSerializerUtils::snapshot)
.toArray(TypeSerializerSnapshot[]::new);
}
- /**
- * Takes a snapshot of the given serializer. In case where the snapshot is
still extending the
- * old {@code TypeSerializerConfigSnapshot} class, the snapshot is set up
properly (with its
- * originating serializer) such that the backwards compatible code paths
work.
- */
- public static <T> TypeSerializerSnapshot<T> snapshotBackwardsCompatible(
- TypeSerializer<T> originatingSerializer) {
- return configureForBackwardsCompatibility(
- originatingSerializer.snapshotConfiguration(),
originatingSerializer);
- }
-
- /**
- * Utility method to bind the serializer and serializer snapshot to a
common generic type
- * variable.
- */
- @SuppressWarnings({"unchecked", "deprecation"})
- private static <T> TypeSerializerSnapshot<T>
configureForBackwardsCompatibility(
- TypeSerializerSnapshot<?> snapshot, TypeSerializer<?> serializer) {
-
- TypeSerializerSnapshot<T> typedSnapshot = (TypeSerializerSnapshot<T>)
snapshot;
- TypeSerializer<T> typedSerializer = (TypeSerializer<T>) serializer;
-
- if (snapshot instanceof TypeSerializerConfigSnapshot) {
- ((TypeSerializerConfigSnapshot<T>)
typedSnapshot).setPriorSerializer(typedSerializer);
- }
-
- return typedSnapshot;
+ /** Takes a snapshot of the given serializer. */
+ public static <T> TypeSerializerSnapshot<T> snapshot(TypeSerializer<T>
originatingSerializer) {
+ return originatingSerializer.snapshotConfiguration();
Review Comment:
Is this function still helpful? Can't we just inline it?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java:
##########
@@ -65,12 +57,10 @@ public static <K, N> InternalTimersSnapshotWriter
getWriterForVersion(
switch (version) {
case NO_VERSION:
- return new InternalTimersSnapshotWriterPreVersioned<>(
- timersSnapshot, keySerializer, namespaceSerializer);
-
case 1:
- return new InternalTimersSnapshotWriterV1<>(
- timersSnapshot, keySerializer, namespaceSerializer);
+ throw new UnsupportedOperationException(
Review Comment:
nit: I'd rather use `IllegalStateException`
##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java:
##########
@@ -145,8 +140,12 @@ public void read(DataInputView in) throws IOException {
serializerSnapshot = deserializeV2(in,
userCodeClassLoader);
break;
case 1:
- serializerSnapshot = deserializeV1(in,
userCodeClassLoader, serializer);
- break;
+ throw new IOException(
+ String.format(
+ "No longer supported version [%d] for
TypeSerializerSnapshot. "
+ + "Please migrate away from the
old TypeSerializerConfigSnapshot "
+ + "and use Flink 1.16 for the
migration",
+ version));
Review Comment:
I find the way we check the incompatibility confusing atm.
There are at least two places that throw a similar exception:
1. `VersionedIOReadeableWritable` from `getIncompatibleVersionError`
2. Here from the `read` method.
I think 1) is actually a dead code now, as we tell in
`TypeSerializerSnapshotSerializationProxy` that we support version 1.
Moreover I find the method
`KeyedBackendSerializationProxy#getIncompatibleVersionError` misleading. I
think at least the `Ops, this should not happen...` is unnecessary. You already
get a message from `VersionedIOReadableWritable` that is an unsupported
version. This comment does not give us any additional context.
Lastly if you leave the `getIncompatibleVersionError` method I'd rename it
something along the lines of "extra information", e.g.
`getAdditionalDetailsForIncompatibleVersion`.
##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java:
##########
@@ -145,8 +140,12 @@ public void read(DataInputView in) throws IOException {
serializerSnapshot = deserializeV2(in,
userCodeClassLoader);
break;
case 1:
- serializerSnapshot = deserializeV1(in,
userCodeClassLoader, serializer);
- break;
+ throw new IOException(
+ String.format(
+ "No longer supported version [%d] for
TypeSerializerSnapshot. "
+ + "Please migrate away from the
old TypeSerializerConfigSnapshot "
+ + "and use Flink 1.16 for the
migration",
+ version));
Review Comment:
Could we also clean this class a bit? I believe
`org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.TypeSerializerSnapshotSerializationProxy#serializer`:
1. is never accessed
2. always null (apart from tests)
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java:
##########
@@ -223,10 +167,10 @@ public static <K, N> InternalTimersSnapshotReader<K, N>
getReaderForVersion(
switch (version) {
case NO_VERSION:
- return new
InternalTimersSnapshotReaderPreVersioned<>(userCodeClassLoader);
-
case 1:
- return new
InternalTimersSnapshotReaderV1<>(userCodeClassLoader);
+ throw new UnsupportedOperationException(
Review Comment:
`IllegalStateException`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]