brachipa commented on issue #30498: URL: https://github.com/apache/beam/issues/30498#issuecomment-2016599662
Hi @Polber . Thanks for checking it out. I run your example and it works me fine, but once I added a new field to the schema it stopped working and event is outputted as event and not as its alias name: added id field: ``` Schema SCHEMA = Schema.of(Schema.Field.of("event", Schema.FieldType.STRING) , Schema.Field.of("id", Schema.FieldType.INT32)); //new field added Pipeline pipeline = Pipeline.create(); PCollection<Row> input = pipeline.apply( Create.of( Row.withSchema(SCHEMA).withFieldValue("event", "abc") .withFieldValue("id", 222) //new field added .build() ) ).setRowSchema(SCHEMA); PCollection<Row> transformed = input.apply( SqlTransform.query("select event as event_name, count(*) as c from PCOLLECTION group by event")); transformed.apply(ParDo.of(new DoFn<Row, Object>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { System.out.println(c.element()); c.output(c.element()); } })).setRowSchema( Schema.of( Schema.Field.of("id", Schema.FieldType.INT32), //new field added Schema.Field.of("event", Schema.FieldType.STRING), Schema.Field.of("c", Schema.FieldType.INT64) ) ); pipeline.run(); } ``` As you can see the beam plan is slightly different (sql plan is the same as the working example), the difference is that in the not working one, you are missing the "BeamCalcRel" step, I guess this is what causing it to take the real name and not the alias name, but I can't figure out what causes the different plan. ``` 2024-03-23 22:43:51.345 INFO 71505 --- [ Test worker] o.a.b.s.e.sql.impl.CalciteQueryPlanner : SQLPlan> LogicalAggregate(group=[{0}], c=[COUNT()]) LogicalProject(event_name=[$0]) BeamIOSourceRel(table=[[beam, PCOLLECTION]]) 2024-03-23 22:43:51.478 INFO 71505 --- [ Test worker] o.a.b.s.e.sql.impl.CalciteQueryPlanner : BEAMPlan> BeamAggregationRel(group=[{0}], c=[COUNT()]) BeamIOSourceRel(table=[[beam, PCOLLECTION]]) Row: event:abc c:1 ``` My origin pipeline is subscribing to pubsub, does 1 minute window aggregation, run sql and sending results also to PubSub. I can have a wider schema, and sql may query only subset of the schema fields. In case my shared example is working to you, can you please share your java version? build dependencies? Thanks again! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org