[
https://issues.apache.org/jira/browse/BEAM-9992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157852#comment-17157852
]
Darshan Jani commented on BEAM-9992:
------------------------------------
Following scenario we have problem because of null handling by coder
Test:
{code:java}
@Test
public void testUnionAllQueryWithBytesAndNulls() {
String sql = "SELECT DISTINCT val.BYTES "
+"from (select b\"1\" BYTES union all "
+"select cast(NULL as bytes) union all "
+"select b\"-1\" union all "
+"select b\"1\" union all "
+"select cast(NULL as bytes)) val";
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
final Schema schema =
Schema.builder().addNullableField("field1",FieldType.BYTES).build();
byte[] nullValue = null;
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).addValues(nullValue).build(),
Row.withSchema(schema).addValues("-1".getBytes(StandardCharsets.UTF_8)).build(),
Row.withSchema(schema).addValues("1".getBytes(StandardCharsets.UTF_8)).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
{code}
Java Exception stack:
{code:java}
Forbidden IOException when reading from InputStream
java.lang.IllegalArgumentException: Forbidden IOException when reading from
InputStream
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at
org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:84)
at
org.apache.beam.runners.direct.FlattenEvaluatorFactory$FlattenEvaluator.processElement(FlattenEvaluatorFactory.java:75)
at
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
at org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:56)
at
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:103)
at
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:95)
at
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
at
org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:348)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$haO6hZhP.decode(Unknown
Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$haO6hZhP.decode(Unknown
Source)
at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
... 12 more
{code}
> Migrate BeamSQL's SET operators to Sets transforms
> --------------------------------------------------
>
> Key: BEAM-9992
> URL: https://issues.apache.org/jira/browse/BEAM-9992
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql, dsl-sql-zetasql
> Reporter: Darshan Jani
> Priority: P2
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> As par of [BEAM-9946|https://issues.apache.org/jira/browse/BEAM-9946] we have
> new Sets transforms for intersect,union and except.
> This jira is to use them to remove existing Set operators in BeamSQL code.
> Tasks:
> # Remove:
> [BeamSetOperatorRelBase.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java]
> # use SetFns transforms from
> ##
> [BeamIntersectRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java]
> ##
> [BeamMinusRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java]
> ##
> [BeamMinusRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java]
> ##
> [BeamUnionRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java]
> #
> Remove:[BeamSetOperatorsTransforms.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)