brachipa commented on issue #30498:
URL: https://github.com/apache/beam/issues/30498#issuecomment-2016599662
Hi @Polber .
Thanks for checking it out.
I run your example and it works me fine, but once I added a new field to the
schema it stopped working and event is outputted as event and not as its alias
name:
added id field:
```
Schema SCHEMA = Schema.of(Schema.Field.of("event",
Schema.FieldType.STRING)
, Schema.Field.of("id", Schema.FieldType.INT32)); //new
field added
Pipeline pipeline = Pipeline.create();
PCollection<Row> input = pipeline.apply(
Create.of(
Row.withSchema(SCHEMA).withFieldValue("event", "abc")
.withFieldValue("id", 222) //new field added
.build()
)
).setRowSchema(SCHEMA);
PCollection<Row> transformed = input.apply(
SqlTransform.query("select event as event_name, count(*) as
c from PCOLLECTION group by event"));
transformed.apply(ParDo.of(new DoFn<Row, Object>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
System.out.println(c.element());
c.output(c.element());
}
})).setRowSchema(
Schema.of(
Schema.Field.of("id", Schema.FieldType.INT32), //new
field added
Schema.Field.of("event", Schema.FieldType.STRING),
Schema.Field.of("c", Schema.FieldType.INT64)
)
);
pipeline.run();
}
```
As you can see the beam plan is slightly different (sql plan is the same as
the working example), the difference is that in the not working one, you are
missing the "BeamCalcRel" step, I guess this is what causing it to take the
real name and not the alias name, but I can't figure out what causes the
different plan.
```
2024-03-23 22:43:51.345 INFO 71505 --- [ Test worker]
o.a.b.s.e.sql.impl.CalciteQueryPlanner : SQLPlan>
LogicalAggregate(group=[{0}], c=[COUNT()])
LogicalProject(event_name=[$0])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])
2024-03-23 22:43:51.478 INFO 71505 --- [ Test worker]
o.a.b.s.e.sql.impl.CalciteQueryPlanner : BEAMPlan>
BeamAggregationRel(group=[{0}], c=[COUNT()])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])
Row:
event:abc
c:1
```
My origin pipeline is subscribing to pubsub, does 1 minute window
aggregation, run sql and sending results also to PubSub.
I can have a wider schema, and sql may query only subset of the schema
fields.
In case my shared example is working to you, can you please share your java
version? build dependencies?
Thanks again!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]