tzulitai commented on a change in pull request #12263:
URL: https://github.com/apache/flink/pull/12263#discussion_r427782378
##########
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
/**
* A {@link TypeSerializerSnapshot} for RowSerializer.
*/
- // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends
CompositeTypeSerializerSnapshot<Row, RowSerializer> {
private static final int VERSION = 3;
- private static final int VERSION_WITHOUT_ROW_KIND = 2;
+ private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
- private boolean legacyModeEnabled = false;
+ private int version = VERSION;
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
RowSerializerSnapshot(RowSerializer serializerInstance) {
super(serializerInstance);
+ this.version = translateVersion(serializerInstance);
}
@Override
protected int getCurrentOuterSnapshotVersion() {
- return VERSION;
+ return version;
Review comment:
This method is only ever relevant for when writing snapshots and not
used on restore.
Therefore, this should always be the latest version, and not the read older
version.
```suggestion
return VERSION;
```
##########
File path:
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##########
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification<Row, Row>
testSpecification) {
public static final class RowSerializerSetup implements
TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
@Override
public TypeSerializer<Row> createPriorSerializer() {
- return stringLongRowSupplier();
+ return createRowSerializer(true);
Review comment:
To really clarify this, I think we should make the `RowSerializer`
constructor that allows passing in the `legacyModeEnabled` flag private, to be
only usable by the `RowSerializerSnapshot#createOuterSerializer`. This concern
should not be leaked into tests.
The bottom line is, the concern of creating an old serializer with previous
formats should only be visible to the snapshots.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
/**
* A {@link TypeSerializerSnapshot} for RowSerializer.
*/
- // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends
CompositeTypeSerializerSnapshot<Row, RowSerializer> {
private static final int VERSION = 3;
- private static final int VERSION_WITHOUT_ROW_KIND = 2;
+ private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
- private boolean legacyModeEnabled = false;
+ private int version = VERSION;
Review comment:
Maybe rename this to `readVersion`, to better convey its difference with
the static `VERSION`.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
/**
* A {@link TypeSerializerSnapshot} for RowSerializer.
*/
- // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends
CompositeTypeSerializerSnapshot<Row, RowSerializer> {
private static final int VERSION = 3;
- private static final int VERSION_WITHOUT_ROW_KIND = 2;
+ private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
- private boolean legacyModeEnabled = false;
+ private int version = VERSION;
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
RowSerializerSnapshot(RowSerializer serializerInstance) {
super(serializerInstance);
+ this.version = translateVersion(serializerInstance);
}
@Override
protected int getCurrentOuterSnapshotVersion() {
- return VERSION;
+ return version;
}
@Override
protected void readOuterSnapshot(
int readOuterSnapshotVersion,
DataInputView in,
ClassLoader userCodeClassLoader) {
- if (readOuterSnapshotVersion ==
VERSION_WITHOUT_ROW_KIND) {
- legacyModeEnabled = true;
+ version = readOuterSnapshotVersion;
+ }
+
+ @Override
+ protected OuterSchemaCompatibility
resolveOuterSchemaCompatibility(RowSerializer newSerializer) {
+ if (version == translateVersion(newSerializer)) {
Review comment:
If I understood this correctly, this check doesn't need to be performed
against the new serializer.
Since starting 1.11 the created new serializer is always writing in the new
format (e.g. `legacyModeEnabled` is always `false`), combined with the fact
that the version for the `RowSerializerSnapshot` was up-ticked in 1.11,
whether or not migration is needed is actually completely encoded /
"piggy-backed" in the version of the `RowSerializerSnapshot`:
i.e. this should be able to be simplified to:
```suggestion
if (readVersion <= LAST_VERSION_WITHOUT_ROW_KIND)) {
return
OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
}
return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
```
##########
File path:
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##########
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification<Row, Row>
testSpecification) {
public static final class RowSerializerSetup implements
TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
@Override
public TypeSerializer<Row> createPriorSerializer() {
- return stringLongRowSupplier();
+ return createRowSerializer(true);
Review comment:
I think there's a mis-understanding in the use of this
`createPriorSerializer` method.
This method is only ever used when writing the test snapshot files, under
the target branch.
e.g.
- if we checkout to the `release-1.10` branch and generate a snapshot, this
should create a serializer that writes in the old format.
- if we checkout to the `release-1.11` branch and generate a snapshot, this
should create a serializer that writes in the new format.
With the current changes in the PR, the generated snapshot files will always
be written with the old format, regardless of which branch you're on.
```suggestion
// in older branches, this writes in old format;
// in newer branches >= 1.11, this writes in new format
return new RowSerializer(fieldSerializers);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]