brachi-wernick commented on a change in pull request #15915:
URL: https://github.com/apache/beam/pull/15915#discussion_r746445391



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -192,9 +207,17 @@ public Calc copy(RelTraitSet traitSet, RelNode input, 
RexProgram program) {
               outputSchema,
               options.getVerifyRowValues(),
               getJarPaths(program),
-              inputGetter.getFieldAccess());
-
-      return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema);
+              inputGetter.getFieldAccess(),
+              errorsTransformer != null);
+
+      PCollectionTuple tuple =
+          upstream.apply(ParDo.of(calcFn).withOutputTags(rows, 
TupleTagList.of(errors)));
+      PCollection<BeamCalcRelError> errorPCollection =
+          
tuple.get(errors).setCoder(BeamCalcRelErrorCoder.of(RowCoder.of(upstream.getSchema())));

Review comment:
       sure,  I can change it to be a json string instead of row, but before I 
change it, I want to make sure we don't miss here something, because I prefer 
to send the row itself to the error transformer.
   
   The error row is not the row *after* running the expression and getting the 
error, it is the origin row, as it was *before* running the DoFn, that's why I 
set the schema to be `upstream.getSchema()`, 
   The row *after* the expression execution is a different one, and has 
`outputSchema`.
   
   Also each node in the same SqlTransform create this chaining, and will 
assign a diff input and output schema, these schemas are per node and not per 
all the sql transform.
    
   This is how will it look like when expanding it in DF (ignore the failed 
icons, this is because I abort the execution but it worked well)
   
![image](https://user-images.githubusercontent.com/3658085/141094730-77671b5b-1cc5-4839-8610-80eb6b379843.png)
   

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -112,11 +119,19 @@
   private static final long MILLIS_PER_DAY = 86400000L;
 
   private static final ParameterExpression rowParam = 
Expressions.parameter(Row.class, "row");
+  private PTransform<PCollection<BeamCalcRelError>, POutput> errorsTransformer;
+  private static final TupleTag<Row> rows = new TupleTag<Row>() {};
+  private static final TupleTag<BeamCalcRelError> errors = new 
TupleTag<BeamCalcRelError>() {};
 
   public BeamCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, 
RexProgram program) {
     super(cluster, traits, input, program);
   }
 
+  @Override
+  public void withErrorsTransformer(PTransform<PCollection<BeamCalcRelError>, 
POutput> ptransform) {

Review comment:
       Yes, I can add to `buildPTransform`, but it will break the API for all 
`BeamRelNode` implementations.
   This way, when I add it with a default function, any implementation that 
doesn't need to handle errors can continue as it was before, and the one that 
wants to add error handling will implement this method and use this variable.
   Does it make sense? or you still prefer to break the api




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to