tzulitai commented on a change in pull request #12263:
URL: https://github.com/apache/flink/pull/12263#discussion_r427782378



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
        /**
         * A {@link TypeSerializerSnapshot} for RowSerializer.
         */
-       // TODO not fully functional yet due to FLINK-17520
        public static final class RowSerializerSnapshot extends 
CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
                private static final int VERSION = 3;
 
-               private static final int VERSION_WITHOUT_ROW_KIND = 2;
+               private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-               private boolean legacyModeEnabled = false;
+               private int version = VERSION;
 
                public RowSerializerSnapshot() {
                        super(RowSerializer.class);
                }
 
                RowSerializerSnapshot(RowSerializer serializerInstance) {
                        super(serializerInstance);
+                       this.version = translateVersion(serializerInstance);
                }
 
                @Override
                protected int getCurrentOuterSnapshotVersion() {
-                       return VERSION;
+                       return version;

Review comment:
       This method is only ever relevant for when writing snapshots and not 
used on restore.
   Therefore, this should always be the latest version, and not the read older 
version.
   ```suggestion
                        return VERSION;
   ```

##########
File path: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##########
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification<Row, Row> 
testSpecification) {
        public static final class RowSerializerSetup implements 
TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
                @Override
                public TypeSerializer<Row> createPriorSerializer() {
-                       return stringLongRowSupplier();
+                       return createRowSerializer(true);

Review comment:
       To really clarify this, I think we should make the `RowSerializer` 
constructor that allows passing in the `legacyModeEnabled` flag private, to be 
only usable by the `RowSerializerSnapshot#createOuterSerializer`. This concern 
should not be leaked into tests.
   
   The bottom line is, the concern of creating an old serializer with previous 
formats should only be visible to the snapshots.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
        /**
         * A {@link TypeSerializerSnapshot} for RowSerializer.
         */
-       // TODO not fully functional yet due to FLINK-17520
        public static final class RowSerializerSnapshot extends 
CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
                private static final int VERSION = 3;
 
-               private static final int VERSION_WITHOUT_ROW_KIND = 2;
+               private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-               private boolean legacyModeEnabled = false;
+               private int version = VERSION;

Review comment:
       Maybe rename this to `readVersion`, to better convey its difference with 
the static `VERSION`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
        /**
         * A {@link TypeSerializerSnapshot} for RowSerializer.
         */
-       // TODO not fully functional yet due to FLINK-17520
        public static final class RowSerializerSnapshot extends 
CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
                private static final int VERSION = 3;
 
-               private static final int VERSION_WITHOUT_ROW_KIND = 2;
+               private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-               private boolean legacyModeEnabled = false;
+               private int version = VERSION;
 
                public RowSerializerSnapshot() {
                        super(RowSerializer.class);
                }
 
                RowSerializerSnapshot(RowSerializer serializerInstance) {
                        super(serializerInstance);
+                       this.version = translateVersion(serializerInstance);
                }
 
                @Override
                protected int getCurrentOuterSnapshotVersion() {
-                       return VERSION;
+                       return version;
                }
 
                @Override
                protected void readOuterSnapshot(
                                int readOuterSnapshotVersion,
                                DataInputView in,
                                ClassLoader userCodeClassLoader) {
-                       if (readOuterSnapshotVersion == 
VERSION_WITHOUT_ROW_KIND) {
-                               legacyModeEnabled = true;
+                       version = readOuterSnapshotVersion;
+               }
+
+               @Override
+               protected OuterSchemaCompatibility 
resolveOuterSchemaCompatibility(RowSerializer newSerializer) {
+                       if (version == translateVersion(newSerializer)) {

Review comment:
       If I understood this correctly, this check doesn't need to be performed 
against the new serializer.
   
   Since starting 1.11 the created new serializer is always writing in the new 
format (e.g. `legacyModeEnabled` is always `false`), combined with the fact 
that the version for the `RowSerializerSnapshot` was up-ticked in 1.11,
   whether or not migration is needed is actually completely encoded / 
"piggy-backed" in the version of the `RowSerializerSnapshot`:
   
   i.e. this should be able to be simplified to:
   ```suggestion
                        if (readVersion <= LAST_VERSION_WITHOUT_ROW_KIND)) {
                            return 
OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
                        }
                        return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
   ```

##########
File path: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##########
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification<Row, Row> 
testSpecification) {
        public static final class RowSerializerSetup implements 
TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
                @Override
                public TypeSerializer<Row> createPriorSerializer() {
-                       return stringLongRowSupplier();
+                       return createRowSerializer(true);

Review comment:
       I think there's a mis-understanding in the use of this 
`createPriorSerializer` method.
   This method is only ever used when writing the test snapshot files, under 
the target branch.
   
   e.g.
   - if we checkout to the `release-1.10` branch and generate a snapshot, this 
should create a serializer that writes in the old format. 
   - if we checkout to the `release-1.11` branch and generate a snapshot, this 
should create a serializer that writes in the new format.
   
   With the current changes in the PR, the generated snapshot files will always 
be written with the old format, regardless of which branch you're on.
   
   ```suggestion
                // in older branches, this writes in old format;
                // in newer branches >= 1.11, this writes in new format
                        return new RowSerializer(fieldSerializers);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to