1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1422192659
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java:
##########
@@ -1244,8 +1244,8 @@ void
testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
testKeyedValueStateUpgrade(
initialAccessDescriptor,
newAccessDescriptorAfterRestore))
.satisfiesAnyOf(
- e ->
assertThat(e).isInstanceOf(IllegalStateException.class),
- e ->
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
+ e ->
assertThat(e).isInstanceOf(StateMigrationException.class),
+ e ->
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
}
Review Comment:
Would you mind adding one test to check all state backends call the new
`resolveSchemaCompatibility(TypeSerializerSnapshot)` instead of old method?
##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java:
##########
@@ -289,6 +292,38 @@ protected void readOuterSnapshot(
int readOuterSnapshotVersion, DataInputView in, ClassLoader
userCodeClassLoader)
throws IOException {}
+ /**
+ * Checks the schema compatibility of the given old serializer snapshot
based on the outer
+ * snapshot.
+ *
+ * <p>The base implementation of this method assumes that the outer
serializer only has nested
+ * serializers and no extra information, and therefore the result of the
check is {@link
+ * OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer
serializer contains some
+ * extra information that has been persisted as part of the serializer
snapshot, this must be
+ * overridden. Note that this method and the corresponding methods {@link
+ * #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int,
DataInputView,
+ * ClassLoader)} needs to be implemented.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot, which
contains the old outer
+ * information to check against.
+ * @return a {@link OuterSchemaCompatibility} indicating whether or the
new serializer's outer
Review Comment:
```suggestion
* @return a {@link OuterSchemaCompatibility} indicating whether the new
serializer's outer
```
##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##########
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in,
ClassLoader userCodeClassLo
* program's serializer re-serializes the data, thus converting the format
during the restore
* operation.
*
+ * @deprecated This method has been replaced by {@link
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
* @param newSerializer the new serializer to check.
* @return the serializer compatibility result.
*/
- TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
- TypeSerializer<T> newSerializer);
+ @Deprecated
+ default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+ TypeSerializer<T> newSerializer) {
+ return
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+ }
+
+ /**
+ * Checks current serializer's compatibility to read data written by the
prior serializer.
+ *
+ * <p>When a checkpoint/savepoint is restored, this method checks whether
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome
can be that the
+ * serialization format is compatible, that the program's serializer needs
to reconfigure itself
+ * (meaning to incorporate some information from the
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed.
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data,
and the restoring
+ * program's serializer re-serializes the data, thus converting the format
during the restore
+ * operation.
+ *
+ * <p>This method must be implemented to clarify the compatibility. See
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+ default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+ TypeSerializerSnapshot<T> oldSerializerSnapshot) {
+ return
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());
Review Comment:
I'm not sure should we add a default implementation for the new
`resolveSchemaCompatibility` method. I'm worried that providing a default
implementation may not work well in some scenarios. We have 2
`resolveSchemaCompatibility` methods for now, and they call each other by
default. This behavior may be a bug, for example:
- The `TypeSerializerSnapshot` of `data1` has old
`resolveSchemaCompatibility(TypeSerializer)` method.
- The `TypeSerializerSnapshot` of `data2` has new
`resolveSchemaCompatibility(TypeSerializerSnapshot)` method.
When flink users upgrade the data from data2 to data1, and flink will call
the `resolveSchemaCompatibility(TypeSerializerSnapshot)` of data1, it will call
the default implementation, so data1 will call the data2's
`resolveSchemaCompatibility(TypeSerializer)`, and then it will call the default
implementation. So call `resolveSchemaCompatibility(TypeSerializerSnapshot)` of
data1 again.
It's a dead loop.
Based on this, I suggest all types inside of flink project should implement
new `resolveSchemaCompatibility(TypeSerializerSnapshot)`. Currently, the
`EitherSerializerSnapshot` and `GenericArraySerializerConfigSnapshot` don't
implement it. WDYT?
Can we not provide a default implementation for the new
`resolveSchemaCompatibility(TypeSerializerSnapshot)`? The default
implementation is provided because the user may have a custom serializer, right?
##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##########
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in,
ClassLoader userCodeClassLo
* program's serializer re-serializes the data, thus converting the format
during the restore
* operation.
*
+ * @deprecated This method has been replaced by {@link
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
* @param newSerializer the new serializer to check.
* @return the serializer compatibility result.
*/
- TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
- TypeSerializer<T> newSerializer);
+ @Deprecated
+ default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+ TypeSerializer<T> newSerializer) {
+ return
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+ }
+
+ /**
+ * Checks current serializer's compatibility to read data written by the
prior serializer.
+ *
+ * <p>When a checkpoint/savepoint is restored, this method checks whether
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome
can be that the
+ * serialization format is compatible, that the program's serializer needs
to reconfigure itself
+ * (meaning to incorporate some information from the
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed.
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data,
and the restoring
+ * program's serializer re-serializes the data, thus converting the format
during the restore
+ * operation.
+ *
+ * <p>This method must be implemented to clarify the compatibility. See
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+ default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
Review Comment:
After this change, should all production callers call the new
`resolveSchemaCompatibility` instead of old one?
For example:
<img width="1366" alt="image"
src="https://github.com/apache/flink/assets/38427477/9e21ad30-c2cc-4c0d-8a00-e5eaf082c011">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]