baeminbo commented on issue #36496:
URL: https://github.com/apache/beam/issues/36496#issuecomment-3406994523

   I checked the code for SchemaTransform's `Row` compatibility handling.
   
   ----
   
   At serialization, the `Row` is a serialization format for Kafka 
configuration `KafkaReadSchemaTransformConfiguration` in 
`KafkaReadSchemaTransform`.
   
https://github.com/apache/beam/blob/1c6f779bdf2784860296bac2ad92c415e342d1f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java#L148-L160
   
   
   **The `Row` is encoded with `Schema` in `PTransformTranslation`  and 
`SchemaTransformTranslation`**. Thus, it has no problem with incompatible 
schema issue like #30276. The `Row` can be always successfully deserialized 
with the provided `Schema`.
   
https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java#L564-L574
   
   
https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslation.java#L56-L71
   
   ----
   
   At deserialization, the `Row` is converted to the configuration class (e.g., 
`KafkaReadSchemaTransformConfiguration`). 
`TypedSchemaTransformProvider#configFromRow` uses 
`SchemaRegistry.createDefault().getFromRowFunction(<configuration class>)` to 
get a configuration object from a `Row`.
   
https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java#L121-L136
   
   The `getFromRowFunction` eventually uses `FromRowUsingCreator` through 
`GetterBasedSchemaProvider`.
   
https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java#L180
   
   `FromRowUsingCreator#fieldConverters` gets `List<FieldValueTypeInformation>` 
from the `TypeDescriptor` and `Schema`.
   
https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java#L117-L120
   
   The list is created at `StaticSchemaInference#sortBySchema` which sorts the 
`List<FieldValueTypeInformation>` to match the field order of the Row's schema. 
This prevents issues caused by mismatched field orders or new fields in the 
type.
   
https://github.com/apache/beam/blob/99ee1738e2bf33add4487de2dad4c290f847b4f1/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java#L48-L53
   
   Therefore, 
   1. The `Row` serialization/deserialization incompatibility was resolved by 
providing the `Schema` for serialization in addition to the serialized `Row` 
itself.
   2. The compatibility between `Row` with old `Schema` and new 
`TypeDescriptor` was resolved `StaticSchemaInference#sortBySchema` by mapping 
the `Row` field and the `TypeDescriptor` field by "name".
   
   I guess `TypedSchemaTransformProvider` didn't have compatibility issue even 
without `sorted()` as the `Schema` at serialization is used at deserialization 
as well, if I understand the code correctly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to