[ 
https://issues.apache.org/jira/browse/BEAM-9992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157852#comment-17157852
 ] 

Darshan Jani commented on BEAM-9992:
------------------------------------

Following scenario we have problem because of null handling by coder
Test:

{code:java}
@Test
  public void testUnionAllQueryWithBytesAndNulls() {
    String sql = "SELECT DISTINCT val.BYTES "
            +"from (select b\"1\" BYTES union all "
            +"select cast(NULL as bytes) union all "
            +"select b\"-1\" union all "
            +"select b\"1\" union all "
            +"select cast(NULL as bytes)) val";

    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);

    final Schema schema = 
Schema.builder().addNullableField("field1",FieldType.BYTES).build();

    byte[] nullValue = null;
    PAssert.that(stream)
            .containsInAnyOrder(
                    Row.withSchema(schema).addValues(nullValue).build(),
                    
Row.withSchema(schema).addValues("-1".getBytes(StandardCharsets.UTF_8)).build(),
                    
Row.withSchema(schema).addValues("1".getBytes(StandardCharsets.UTF_8)).build());

    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
  }

{code}

Java Exception stack:

{code:java}
Forbidden IOException when reading from InputStream
java.lang.IllegalArgumentException: Forbidden IOException when reading from 
InputStream
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
        at 
org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:84)
        at 
org.apache.beam.runners.direct.FlattenEvaluatorFactory$FlattenEvaluator.processElement(FlattenEvaluatorFactory.java:75)
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
        at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
        at org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:56)
        at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:103)
        at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:95)
        at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
        at 
org.apache.beam.sdk.coders.RowCoderGenerator$DecodeInstruction.decodeDelegate(RowCoderGenerator.java:348)
        at org.apache.beam.sdk.coders.Coder$ByteBuddy$haO6hZhP.decode(Unknown 
Source)
        at org.apache.beam.sdk.coders.Coder$ByteBuddy$haO6hZhP.decode(Unknown 
Source)
        at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
        ... 12 more
{code}



> Migrate BeamSQL's SET operators to Sets transforms
> --------------------------------------------------
>
>                 Key: BEAM-9992
>                 URL: https://issues.apache.org/jira/browse/BEAM-9992
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql, dsl-sql-zetasql
>            Reporter: Darshan Jani
>            Priority: P2
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As par of [BEAM-9946|https://issues.apache.org/jira/browse/BEAM-9946] we have 
> new Sets transforms for intersect,union and except.
> This jira is to use them to remove existing Set operators in BeamSQL code.
> Tasks:
> # Remove: 
> [BeamSetOperatorRelBase.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java]
> # use SetFns transforms from
> ## 
> [BeamIntersectRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIntersectRel.java]
> ## 
> [BeamMinusRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java]
> ## 
> [BeamMinusRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRel.java]
> ## 
> [BeamUnionRel.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamUnionRel.java]
> # 
> Remove:[BeamSetOperatorsTransforms.java|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to