apilloud commented on a change in pull request #15915:
URL: https://github.com/apache/beam/pull/15915#discussion_r746191174



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelError.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.util.Objects;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class BeamCalcRelError {
+
+  private Row row;
+  private String error;
+
+  public BeamCalcRelError(Row row, String error) {
+    this.row = row;
+    this.error = error == null ? "empty error msg" : error;
+  }
+
+  public Row getRow() {
+    return row;
+  }
+
+  public void setRow(Row row) {
+    this.row = row;
+  }
+
+  public String getError() {
+    return error;
+  }
+
+  public void setError(String error) {
+    this.error = error;
+  }
+
+  @Override
+  public boolean equals(@Nullable Object o) {

Review comment:
       We prefer autovalue to this boilerplate. For an example: 
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
##########
@@ -71,15 +77,20 @@
    * A {@link BeamRelNode} is a recursive structure, the {@code 
BeamQueryPlanner} visits it with a
    * DFS(Depth-First-Search) algorithm.
    */
-  static PCollection<Row> toPCollection(
-      Pipeline pipeline, BeamRelNode node, Map<Integer, PCollection<Row>> 
cache) {
+  public static PCollection<Row> toPCollection(
+      Pipeline pipeline,
+      BeamRelNode node,
+      Map<Integer, PCollection<Row>> cache,
+      @Nullable PTransform<PCollection<BeamCalcRelError>, POutput> 
errorTransformer) {

Review comment:
       nit: this should go before the cache.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
##########
@@ -39,12 +41,15 @@
 public class BeamSqlRelUtils {
 
   public static PCollection<Row> toPCollection(Pipeline pipeline, BeamRelNode 
node) {
-    return toPCollection(pipeline, node, new HashMap());
+    return toPCollection(pipeline, node, new HashMap(), null);
   }
 
   /** Transforms the inputs into a PInput. */
   private static PCollectionList<Row> buildPCollectionList(
-      List<RelNode> inputRels, Pipeline pipeline, Map<Integer, 
PCollection<Row>> cache) {
+      List<RelNode> inputRels,
+      Pipeline pipeline,
+      Map<Integer, PCollection<Row>> cache,
+      @Nullable PTransform<PCollection<BeamCalcRelError>, POutput> 
errorTransformer) {

Review comment:
       nit: you should add this before the `cache` field.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -192,9 +207,17 @@ public Calc copy(RelTraitSet traitSet, RelNode input, 
RexProgram program) {
               outputSchema,
               options.getVerifyRowValues(),
               getJarPaths(program),
-              inputGetter.getFieldAccess());
-
-      return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema);
+              inputGetter.getFieldAccess(),
+              errorsTransformer != null);
+
+      PCollectionTuple tuple =
+          upstream.apply(ParDo.of(calcFn).withOutputTags(rows, 
TupleTagList.of(errors)));
+      PCollection<BeamCalcRelError> errorPCollection =
+          
tuple.get(errors).setCoder(BeamCalcRelErrorCoder.of(RowCoder.of(upstream.getSchema())));

Review comment:
       This won't work, there is no guarantee that your input row matches the 
input row.
   
   If your input row is `1 as key, "a" as value`. If your SqlTransform is 
`SELECT CustomUdafWithError(key)`, you'd end up with a row of `1 as key` in the 
error. On the overall SQL transform you can end up with multiple different 
coders, for example if you have a join.
   
   I think you'll actually need a coder that to encodes the coder, it's going 
to get complex quick. We might just want to return the string (or possibly 
JSON) representation of the error row instead? Can you add tests where you have 
errors from the same SqlTransform with different rows schemas? We are trying to 
move to Schema coder for everything, but I think this use case (variable row 
structure) isn't something we support.
   cc: @reuvenlax 

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
##########
@@ -39,12 +41,15 @@
 public class BeamSqlRelUtils {
 
   public static PCollection<Row> toPCollection(Pipeline pipeline, BeamRelNode 
node) {

Review comment:
       You should add a new `public static PCollection<Row> 
toPCollection(Pipeline pipeline, BeamRelNode node, @Nullable 
PTransform<PCollection<BeamCalcRelError>, POutput> errorTransformer)` right 
after this one.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
##########
@@ -164,7 +168,10 @@
     BeamSqlEnv sqlEnv = sqlEnvBuilder.build();
     ddlStrings().forEach(sqlEnv::executeDdl);
     return BeamSqlRelUtils.toPCollection(
-        input.getPipeline(), sqlEnv.parseQuery(queryString(), 
queryParameters()));
+        input.getPipeline(),
+        sqlEnv.parseQuery(queryString(), queryParameters()),
+        new HashMap<>(),

Review comment:
       This shouldn't be here.

##########
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlErrorTest.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRelError;
+import org.apache.beam.sdk.extensions.sql.impl.udf.CustomUdafWithError;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class BeamSqlErrorTest extends 
org.apache.beam.sdk.extensions.sql.BeamSqlDslBase {

Review comment:
       nit: drop the `org.apache.beam.sdk.extensions.sql.` it isn't need.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
##########
@@ -71,15 +77,20 @@
    * A {@link BeamRelNode} is a recursive structure, the {@code 
BeamQueryPlanner} visits it with a
    * DFS(Depth-First-Search) algorithm.
    */
-  static PCollection<Row> toPCollection(
-      Pipeline pipeline, BeamRelNode node, Map<Integer, PCollection<Row>> 
cache) {
+  public static PCollection<Row> toPCollection(

Review comment:
       This should remain package private.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to