apilloud commented on a change in pull request #15915: URL: https://github.com/apache/beam/pull/15915#discussion_r746191174
########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRelError.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.util.Objects; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class BeamCalcRelError { + + private Row row; + private String error; + + public BeamCalcRelError(Row row, String error) { + this.row = row; + this.error = error == null ? "empty error msg" : error; + } + + public Row getRow() { + return row; + } + + public void setRow(Row row) { + this.row = row; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + @Override + public boolean equals(@Nullable Object o) { Review comment: We prefer autovalue to this boilerplate. For an example: https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/NodeStats.java ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java ########## @@ -71,15 +77,20 @@ * A {@link BeamRelNode} is a recursive structure, the {@code BeamQueryPlanner} visits it with a * DFS(Depth-First-Search) algorithm. */ - static PCollection<Row> toPCollection( - Pipeline pipeline, BeamRelNode node, Map<Integer, PCollection<Row>> cache) { + public static PCollection<Row> toPCollection( + Pipeline pipeline, + BeamRelNode node, + Map<Integer, PCollection<Row>> cache, + @Nullable PTransform<PCollection<BeamCalcRelError>, POutput> errorTransformer) { Review comment: nit: this should go before the cache. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java ########## @@ -39,12 +41,15 @@ public class BeamSqlRelUtils { public static PCollection<Row> toPCollection(Pipeline pipeline, BeamRelNode node) { - return toPCollection(pipeline, node, new HashMap()); + return toPCollection(pipeline, node, new HashMap(), null); } /** Transforms the inputs into a PInput. */ private static PCollectionList<Row> buildPCollectionList( - List<RelNode> inputRels, Pipeline pipeline, Map<Integer, PCollection<Row>> cache) { + List<RelNode> inputRels, + Pipeline pipeline, + Map<Integer, PCollection<Row>> cache, + @Nullable PTransform<PCollection<BeamCalcRelError>, POutput> errorTransformer) { Review comment: nit: you should add this before the `cache` field. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java ########## @@ -192,9 +207,17 @@ public Calc copy(RelTraitSet traitSet, RelNode input, RexProgram program) { outputSchema, options.getVerifyRowValues(), getJarPaths(program), - inputGetter.getFieldAccess()); - - return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema); + inputGetter.getFieldAccess(), + errorsTransformer != null); + + PCollectionTuple tuple = + upstream.apply(ParDo.of(calcFn).withOutputTags(rows, TupleTagList.of(errors))); + PCollection<BeamCalcRelError> errorPCollection = + tuple.get(errors).setCoder(BeamCalcRelErrorCoder.of(RowCoder.of(upstream.getSchema()))); Review comment: This won't work, there is no guarantee that your input row matches the input row. If your input row is `1 as key, "a" as value`. If your SqlTransform is `SELECT CustomUdafWithError(key)`, you'd end up with a row of `1 as key` in the error. On the overall SQL transform you can end up with multiple different coders, for example if you have a join. I think you'll actually need a coder that to encodes the coder, it's going to get complex quick. We might just want to return the string (or possibly JSON) representation of the error row instead? Can you add tests where you have errors from the same SqlTransform with different rows schemas? We are trying to move to Schema coder for everything, but I think this use case (variable row structure) isn't something we support. cc: @reuvenlax ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java ########## @@ -39,12 +41,15 @@ public class BeamSqlRelUtils { public static PCollection<Row> toPCollection(Pipeline pipeline, BeamRelNode node) { Review comment: You should add a new `public static PCollection<Row> toPCollection(Pipeline pipeline, BeamRelNode node, @Nullable PTransform<PCollection<BeamCalcRelError>, POutput> errorTransformer)` right after this one. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java ########## @@ -164,7 +168,10 @@ BeamSqlEnv sqlEnv = sqlEnvBuilder.build(); ddlStrings().forEach(sqlEnv::executeDdl); return BeamSqlRelUtils.toPCollection( - input.getPipeline(), sqlEnv.parseQuery(queryString(), queryParameters())); + input.getPipeline(), + sqlEnv.parseQuery(queryString(), queryParameters()), + new HashMap<>(), Review comment: This shouldn't be here. ########## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlErrorTest.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import java.util.Objects; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRelError; +import org.apache.beam.sdk.extensions.sql.impl.udf.CustomUdafWithError; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +public class BeamSqlErrorTest extends org.apache.beam.sdk.extensions.sql.BeamSqlDslBase { Review comment: nit: drop the `org.apache.beam.sdk.extensions.sql.` it isn't need. ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java ########## @@ -71,15 +77,20 @@ * A {@link BeamRelNode} is a recursive structure, the {@code BeamQueryPlanner} visits it with a * DFS(Depth-First-Search) algorithm. */ - static PCollection<Row> toPCollection( - Pipeline pipeline, BeamRelNode node, Map<Integer, PCollection<Row>> cache) { + public static PCollection<Row> toPCollection( Review comment: This should remain package private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
