@dawidwys and me had a long joint review / discussion session. A lot in this PR
goes in a very good direction. We would suggest some not too large changes.
Please let us know if you agree, we may have also overlooked or misinterpreted
some implications.
**Desired Goals**
- Once this PR is merged, Flink should be in a releasable state, meaning that
it should not matter if we manage to upgrade all serializers to the new model
before the release. That is important for the time-based releasing and the
uncertainty with respect to Scala code generation in serializers, etc. If we
manage to upgrade half of Flink's serializers, it should be perfectly fine.
- There are also users have written customer serializers and with that,
custom config snapshots. We cannot expect that they update them strictly in
sync with our release. It would be good for users to have an easy way to adjust
their customer serializers' config snapshots such that they still work.
- The current code follows the assumption that in order to be upgraded (1)
serializers need to bump strictly one version and (2) on their backwards
compatibility paths, read first the serializer (via Java serialization) and
then the config snapshot data. Any serializer not following this strict
contract will have errors during the reading of the config snapshot data. That
seems a hard and fragile contract, especially considering that users also need
to follow that path for upgrades.
**Suggested Change**
- We do not completely remove the Java Serialization for serializers, but
make it optional. Config Snapshots can decide whether they want the prior
serializer to be serialized into the meta info or not. This is similar to the
current backwards compatibility path, but does not put the serialized
serializer into the same byte stream and does not making the assumption that
the specific value of the version tells you that.
- We extend the `TypeSerializerConfigSnapshot` class as suggested below. This
tells the config snapshot writer whether to write the previous serializer. It
also removes the need for the "Backward Compatible Wrapper", because the
TypeSerializerConfigSnapshot handles the bridging directly.
- We introduce (possibly later PR) the
`SerializerConfigSnapshotBackwardsAdapter` and the new
`SerializerConfigSnapshot`. All updated config snapshots should extend the
`SerializerConfigSnapshot` and the not updated ones should extend the
`SerializerConfigSnapshotBackwardsAdapter`. We can then make the methods in
TypeSerializerConfigSnapshot abstract.
- Once we decide we have given enough of a grace period for users to update
their serializers and config snapshots, we remove the optional serialization of
the previous serializer, remove the old `TypeSerializerConfigSnapshot` class
and only keep the new `SerializerConfigSnapshot` class.
- This should follow the same spirit as this PR, but introduce fewer subtle
contracts and decouple versioning from the removal of the Java serialization.
```java
public abstract class TypeSerializerConfigSnapshot<T> extends
VersionedIOReadableWritable {
private TypeSerializer<T>priorSerializer;
public boolean needsPriorSerializerPersisted() { return true; }
public void setPriorSerializer(TypeSerializer<T> prior) {
this.priorSerializer = prior; }
public TypeSerializer<T>getPriorSerializer() { return this.priorSerializer;
}
public TypeSerializer<T>restoreSerializer() {
if (priorSerializer != null) {
return priorSerializer;
} else {
throw new IllegalStateException(...);
}
}
// ... the current methods
}
public class SerializerConfigSnapshotBackwardsAdapter<T> extends
TypeSerializerConfigSnapshot<T>{
// same methods as above
}
public class SerializerConfigSnapshot<T> extends
TypeSerializerConfigSnapshot<T> {
public final boolean needsPriorSerializerPersisted() { return false; }
public final void setPriorSerializer(TypeSerializer<T> prior) {}
public final TypeSerializer<T>getPriorSerializer() { throw new
UnsupportedOperationException(); }
// this strictly needs to be implemented
public abstract TypeSerializer<T>restoreSerializer();
}
```
What do you think about that?
[ Full content available at: https://github.com/apache/flink/pull/6711 ]
This message was relayed via gitbox.apache.org for [email protected]