baeminbo commented on issue #36496: URL: https://github.com/apache/beam/issues/36496#issuecomment-3406994523
I checked the code for SchemaTransform's `Row` compatibility handling. ---- At serialization, the `Row` is a serialization format for Kafka configuration `KafkaReadSchemaTransformConfiguration` in `KafkaReadSchemaTransform`. https://github.com/apache/beam/blob/1c6f779bdf2784860296bac2ad92c415e342d1f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java#L148-L160 **The `Row` is encoded with `Schema` in `PTransformTranslation` and `SchemaTransformTranslation`**. Thus, it has no problem with incompatible schema issue like #30276. The `Row` can be always successfully deserialized with the provided `Schema`. https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java#L564-L574 https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformTranslation.java#L56-L71 ---- At deserialization, the `Row` is converted to the configuration class (e.g., `KafkaReadSchemaTransformConfiguration`). `TypedSchemaTransformProvider#configFromRow` uses `SchemaRegistry.createDefault().getFromRowFunction(<configuration class>)` to get a configuration object from a `Row`. https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java#L121-L136 The `getFromRowFunction` eventually uses `FromRowUsingCreator` through `GetterBasedSchemaProvider`. https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java#L180 `FromRowUsingCreator#fieldConverters` gets `List<FieldValueTypeInformation>` from the `TypeDescriptor` and `Schema`. https://github.com/apache/beam/blob/c703b7227de2835665b9ee63894a56e30a56c124/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java#L117-L120 The list is created at `StaticSchemaInference#sortBySchema` which sorts the `List<FieldValueTypeInformation>` to match the field order of the Row's schema. This prevents issues caused by mismatched field orders or new fields in the type. https://github.com/apache/beam/blob/99ee1738e2bf33add4487de2dad4c290f847b4f1/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java#L48-L53 Therefore, 1. The `Row` serialization/deserialization incompatibility was resolved by providing the `Schema` for serialization in addition to the serialized `Row` itself. 2. The compatibility between `Row` with old `Schema` and new `TypeDescriptor` was resolved `StaticSchemaInference#sortBySchema` by mapping the `Row` field and the `TypeDescriptor` field by "name". I guess `TypedSchemaTransformProvider` didn't have compatibility issue even without `sorted()` as the `Schema` at serialization is used at deserialization as well, if I understand the code correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
