apilloud commented on a change in pull request #15915:
URL: https://github.com/apache/beam/pull/15915#discussion_r746196223
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -192,9 +207,17 @@ public Calc copy(RelTraitSet traitSet, RelNode input,
RexProgram program) {
outputSchema,
options.getVerifyRowValues(),
getJarPaths(program),
- inputGetter.getFieldAccess());
-
- return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema);
+ inputGetter.getFieldAccess(),
+ errorsTransformer != null);
+
+ PCollectionTuple tuple =
+ upstream.apply(ParDo.of(calcFn).withOutputTags(rows,
TupleTagList.of(errors)));
+ PCollection<BeamCalcRelError> errorPCollection =
+
tuple.get(errors).setCoder(BeamCalcRelErrorCoder.of(RowCoder.of(upstream.getSchema())));
Review comment:
This won't work, there is no guarantee that your error row matches the
input row.
If your input row is `1 as key, "a" as value`. If your SqlTransform is
`SELECT CustomUdafWithError(key)`, you'd end up with a row of `1 as key` in the
error. On the overall SQL transform you can end up with multiple different
coders, for example if you have a join.
I think you'll actually need a coder that encodes the coder (or at least the
schema), it's going to get complex quick. We might just want to return the
string (or possibly JSON) representation of the error row instead? Can you add
tests where you have errors from the same SqlTransform with different rows
schemas? We are trying to move to Schema coder for everything, but I think this
use case (variable row structure) isn't something we support.
cc: @reuvenlax
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]