ahmedabu98 commented on code in PR #31362:
URL: https://github.com/apache/beam/pull/31362#discussion_r1628208472
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -232,6 +116,140 @@ public List<String> outputCollectionNames() {
return Arrays.asList("output", "errors");
}
+ static class KafkaReadSchemaTransform extends SchemaTransform {
+ private final KafkaReadSchemaTransformConfiguration configuration;
+
+ KafkaReadSchemaTransform(KafkaReadSchemaTransformConfiguration
configuration) {
+ this.configuration = configuration;
+ }
+
+ Row getConfigurationRow() {
+ try {
+ // To stay consistent with our SchemaTransform configuration naming
conventions,
+ // we sort lexicographically
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(KafkaReadSchemaTransformConfiguration.class)
+ .apply(configuration)
+ .sorted()
Review Comment:
This is just to keep in line with what TypedSchemaTransformProvider does
when producing a config schema:
https://github.com/apache/beam/blob/f67f95c1553e0b4447edbd63671408d45e0a35c3/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java#L96
This is due to the SchemaProvider not always producing a consistent schema
(#24361). So we sort to keep it consistent
> Do we need to do this for every implementation
Right now unfortunately yes. I'm working on adding some things to
SchemaTransform (#30943) to avoid having to copy this everywhere. My hope is
this change will make SchemaTransformTranslation sufficient for all and help
avoid needing a <IO>SchemaTransformTranslation for each IO.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]