damccorm opened a new issue, #20050:
URL: https://github.com/apache/beam/issues/20050
I ran into this problem when trying to put my Avro records through the
SqlTransform.
I was able to reduce the reproduction path to the code below.
This code fails on my machine (using Beam 2.19.0) with the following
NullPointerException
```
org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse
query SELECT name, direction
FROM InputStream at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:175)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:103)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at
org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
at
com.bol.analytics.m2.TestAvro2SQL.testAvro2SQL(TestAvro2SQL.java:99)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
at
org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at
org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at
com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by:
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.ValidationException:
java.lang.NullPointerException
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:217)
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:144)
...
31 more
Caused by: java.lang.NullPointerException
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl.createSqlType(SqlTypeFactoryImpl.java:45)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:280)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:287)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.lambda$toCalciteRowType$0(CalciteUtils.java:261)
at
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:581)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toCalciteRowType(CalciteUtils.java:258)
at
org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable.getRowType(BeamCalciteTable.java:71)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:215)
...
32 more
```
```
@Test
@Category(NeedsRunner.class)
public void testAvro2SQL() {
// ============================================================
// The base test input
Schema testSchema = (new Schema.Parser()).parse(
"{\"type\":\"record\",\"name\":\"Transport\"," +
"\"fields\":[" +
"{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},"
+
"{\"name\":\"direction\",\"type\":{\"type\":\"enum\",\"name\":\"DirectionType\",\"symbols\":[\"PULL\",\"PUSH\"]}}"
+
"]}");
GenericRecord record = new GenericRecordBuilder(testSchema)
.set("name", "Test")
.set("direction", new
GenericData.EnumSymbol(testSchema.getField("direction").schema(),
"PULL"))
.build();
// List of test Inputs
List<GenericRecord> testRecords
= Collections.singletonList(record);
// ============================================================
// Convert into a PCollection<Row>
PCollection<Row> input = pipeline
.apply(Create.of(testRecords).withCoder(AvroGenericCoder.of(testSchema)))
.apply(ParDo.of(new DoFn<GenericRecord, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(toBeamRowStrict(c.element(),
null));
}
}))
.setRowSchema(toBeamSchema(testSchema));
// ============================================================
PCollection<Row> result
=
// This way we give a name to the input stream for use in the SQL
PCollectionTuple
.of("InputStream", input)
// Apply the SQL.
.apply("Execute
SQL", SqlTransform
.query("SELECT" +
" name, direction
" +
"FROM InputStream"));
pipeline.run().waitUntilFinish();
}
```
Imported from Jira
[BEAM-9361](https://issues.apache.org/jira/browse/BEAM-9361). Original Jira may
contain additional context.
Reported by: nielsbasjes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]