brachi-wernick commented on a change in pull request #15915:
URL: https://github.com/apache/beam/pull/15915#discussion_r748900363
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -192,9 +207,17 @@ public Calc copy(RelTraitSet traitSet, RelNode input,
RexProgram program) {
outputSchema,
options.getVerifyRowValues(),
getJarPaths(program),
- inputGetter.getFieldAccess());
-
- return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema);
+ inputGetter.getFieldAccess(),
+ errorsTransformer != null);
+
+ PCollectionTuple tuple =
+ upstream.apply(ParDo.of(calcFn).withOutputTags(rows,
TupleTagList.of(errors)));
+ PCollection<BeamCalcRelError> errorPCollection =
+
tuple.get(errors).setCoder(BeamCalcRelErrorCoder.of(RowCoder.of(upstream.getSchema())));
Review comment:
I added a complex test, with 2 udaf, each one produced a different error
on a different schema (one before group by and one after group by)
I assert in the tests that I get 2 error rows each one with its matching
schema.
from logs: (you can see that `CastUdf` and `CalculatePrice` exist in 2 diff
steps.
```
INFO: SQL:
SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`country_code`,
`CalculatePrice`(SUM(`CastUdf`(`PCOLLECTION`.`amount`)),
`PCOLLECTION`.`currency`) AS `sum_amount`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
GROUP BY `PCOLLECTION`.`id`, `PCOLLECTION`.`country_code`,
`PCOLLECTION`.`currency`
Nov 14, 2021 9:37:51 PM
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(id=[$0], country_code=[$1], sum_amount=[CalculatePrice($3,
$2)])
LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)])
LogicalProject(id=[$0], country_code=[$2], currency=[$3],
$f3=[CastUdf($1)])
BeamIOSourceRel(table=[[beam, PCOLLECTION]])
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]