Niels Basjes created BEAM-9267:
----------------------------------
Summary: A BeamSQL UDF that returns a Map fails always with
NullPointerException.
Key: BEAM-9267
URL: https://issues.apache.org/jira/browse/BEAM-9267
Project: Beam
Issue Type: Bug
Components: dsl-sql
Affects Versions: 2.19.0
Reporter: Niels Basjes
When I create a UDF that returns a Map<String, String> and call it from within
a SQL statement it consistently fails with a NullPointerException
My UDF
{code}public class FooMap implements SerializableFunction<String, Map<String,
String>> {
@Override
public Map<String, String> apply(String input) {
final HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("Some", "Thing");
return hashMap;
}
}
{code}
and
{code}
public class BarString implements SerializableFunction<String, String> {
@Override
public String apply(String input) {
return new StringBuilder(input).reverse().toString();
}
}
{code}
My test
{code}
@Category(ValidatesRunner.class)
public class TestFunctionReturnsMap implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(TestFunctionReturnsMap.class);
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testUserAgentAnalysisSQL() {
// ============================================================
// Create input PCollection<Row>
Schema inputSchema = Schema
.builder()
.addStringField("bar")
.build();
PCollection<Row> input = pipeline
.apply(Create.of(Arrays.asList("One", "Two", "Three")))
.setCoder(StringUtf8Coder.of())
.apply(ParDo.of(new DoFn<String, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(Row
.withSchema(inputSchema)
.addValues(c.element())
.build());
}
})).setRowSchema(inputSchema);
// ============================================================
PCollection<Row> result =
// This way we give a name to the input stream for use in the SQL
PCollectionTuple.of("InputStream", input)
// Apply the SQL with the UDFs we need.
.apply("Execute SQL", SqlTransform
.query(
"SELECT" +
" bar AS bar" +
" ,Bar(bar) AS barbar " +
" ,Foo(bar) AS foobar " +
"FROM InputStream")
.registerUdf("Foo", new FooMap())
.registerUdf("Bar", new BarString())
);
result.apply(ParDo.of(new RowPrinter()));
pipeline.run().waitUntilFinish();
}
public static class RowPrinter extends DoFn<Row, Row> {
@ProcessElement
public void processElement(ProcessContext c) {
final Row row = c.element();
LOG.info("ROW: {} --> {}", row, row.getSchema());
}
}
}
{code}
The Exception I always get
{code}
java.lang.NullPointerException: Null type
at
org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at
org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
at
nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)