[ 
https://issues.apache.org/jira/browse/BEAM-9361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108765#comment-17108765
 ] 

Brian Hulette commented on BEAM-9361:
-------------------------------------

Rueven did add an 
[EnumerationType|https://github.com/apache/beam/blob/d7df9ed14bca07d341bb689053e82674bf0e0243/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/EnumerationType.java]
 which would be appropriate here, but I'm not sure what will happen if you try 
to use it in SQL.

> NPE When putting Avro record with enum through SqlTransform
> -----------------------------------------------------------
>
>                 Key: BEAM-9361
>                 URL: https://issues.apache.org/jira/browse/BEAM-9361
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>    Affects Versions: 2.19.0
>            Reporter: Niels Basjes
>            Priority: Major
>
> I ran into this problem when trying to put my Avro records through the 
> SqlTransform.
> I was able to reduce the reproduction path to the code below.
> This code fails on my machine (using Beam 2.19.0) with the following 
> NullPointerException
> {code:java}
>  org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse 
> query SELECT name, direction FROM InputStream        at 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:175)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:103)
>       at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
>       at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
>       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
>       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
>       at 
> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
>       at com.bol.analytics.m2.TestAvro2SQL.testAvro2SQL(TestAvro2SQL.java:99)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.ValidationException:
>  java.lang.NullPointerException
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:217)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:144)
>       ... 31 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl.createSqlType(SqlTypeFactoryImpl.java:45)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:280)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:287)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.lambda$toCalciteRowType$0(CalciteUtils.java:261)
>       at 
> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
>       at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:581)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toCalciteRowType(CalciteUtils.java:258)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable.getRowType(BeamCalciteTable.java:71)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>       at 
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:215)
>       ... 32 more
> {code}
>  
> {code:java}
>     @Test
>     @Category(NeedsRunner.class)
>     public void testAvro2SQL() {
>         // ============================================================
>         // The base test input
>         Schema testSchema = (new Schema.Parser()).parse(
>                 "{\"type\":\"record\",\"name\":\"Transport\"," +
>                 "\"fields\":[" +
>                 
> "{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},"
>  +
>                 
> "{\"name\":\"direction\",\"type\":{\"type\":\"enum\",\"name\":\"DirectionType\",\"symbols\":[\"PULL\",\"PUSH\"]}}"
>  +
>                 "]}");
>         GenericRecord record =  new GenericRecordBuilder(testSchema)
>                 .set("name", "Test")
>                 .set("direction", new 
> GenericData.EnumSymbol(testSchema.getField("direction").schema(), "PULL"))
>                 .build();
>         // List of test Inputs
>         List<GenericRecord> testRecords = Collections.singletonList(record);
>         // ============================================================
>         // Convert into a PCollection<Row>
>         PCollection<Row> input = pipeline
>             
> .apply(Create.of(testRecords).withCoder(AvroGenericCoder.of(testSchema)))
>             .apply(ParDo.of(new DoFn<GenericRecord, Row>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     c.output(toBeamRowStrict(c.element(), null));
>                 }
>                 }))
>             .setRowSchema(toBeamSchema(testSchema));
>         // ============================================================
>         PCollection<Row> result =
>             // This way we give a name to the input stream for use in the SQL
>             PCollectionTuple
>                 .of("InputStream", input)
>                 // Apply the SQL.
>                 .apply("Execute SQL", SqlTransform
>                     .query("SELECT" +
>                             " name, direction " +
>                             "FROM InputStream"));
>         pipeline.run().waitUntilFinish();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to