brachipa commented on issue #30498:
URL: https://github.com/apache/beam/issues/30498#issuecomment-2016599662

   Hi @Polber .
   Thanks for checking it out.
   I run your example and it works me fine, but once I added a new field to the 
schema it stopped working and event is outputted as event and not as its alias 
name:
   added id field:
   
   ```
      Schema SCHEMA = Schema.of(Schema.Field.of("event", 
Schema.FieldType.STRING)
                   , Schema.Field.of("id", Schema.FieldType.INT32)); //new 
field added
           Pipeline pipeline = Pipeline.create();
           PCollection<Row> input = pipeline.apply(
                   Create.of(
                           Row.withSchema(SCHEMA).withFieldValue("event", "abc")
                                   .withFieldValue("id", 222) //new field added
                                   .build()
                   )
           ).setRowSchema(SCHEMA);
   
           PCollection<Row> transformed = input.apply(
                   SqlTransform.query("select event as event_name, count(*) as 
c from PCOLLECTION group by event"));
           transformed.apply(ParDo.of(new DoFn<Row, Object>() {
               @ProcessElement
               public void processElement(ProcessContext c) throws Exception {
                   System.out.println(c.element());
                   c.output(c.element());
               }
           })).setRowSchema(
                   Schema.of(
                           Schema.Field.of("id", Schema.FieldType.INT32), //new 
field added
                           Schema.Field.of("event", Schema.FieldType.STRING),
                           Schema.Field.of("c", Schema.FieldType.INT64)
                   )
           );
   
           pipeline.run();
       }
   ```
   
   As you can see the beam plan is slightly different (sql plan is the same as 
the working example), the difference is that in the not working one, you are 
missing the "BeamCalcRel" step, I guess this is what causing it to take the 
real name and not the alias name, but I can't figure out what causes the 
different plan. 
   
   ```
   2024-03-23 22:43:51.345  INFO 71505 --- [    Test worker] 
o.a.b.s.e.sql.impl.CalciteQueryPlanner   : SQLPlan>
   LogicalAggregate(group=[{0}], c=[COUNT()])
     LogicalProject(event_name=[$0])
       BeamIOSourceRel(table=[[beam, PCOLLECTION]])
   
   2024-03-23 22:43:51.478  INFO 71505 --- [    Test worker] 
o.a.b.s.e.sql.impl.CalciteQueryPlanner   : BEAMPlan>
   BeamAggregationRel(group=[{0}], c=[COUNT()])
     BeamIOSourceRel(table=[[beam, PCOLLECTION]])
   
   Row: 
   event:abc
   c:1
   
   ```
   
   My origin pipeline is subscribing to pubsub, does 1 minute window 
aggregation, run sql and sending results also to PubSub.
   I can have a wider schema, and sql may query only subset of the schema 
fields.
   
   In case my shared example is working to you, can you please share your java 
version? build dependencies? 
   Thanks again!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to