[ 
https://issues.apache.org/jira/browse/BEAM-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121692#comment-17121692
 ] 

Kenneth Knowles commented on BEAM-9267:
---------------------------------------

This issue is assigned but has not received an update in 30 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> A BeamSQL UDF that returns a Map fails always with NullPointerException.
> ------------------------------------------------------------------------
>
>                 Key: BEAM-9267
>                 URL: https://issues.apache.org/jira/browse/BEAM-9267
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>    Affects Versions: 2.19.0
>            Reporter: Niels Basjes
>            Assignee: Rui Wang
>            Priority: P2
>              Labels: stale-assigned
>
> When I create a UDF that returns a Map<String, String> and call it from 
> within a SQL statement it consistently fails with a NullPointerException
> My UDF
> {code}public class FooMap implements SerializableFunction<String, Map<String, 
> String>> {
>     @Override
>     public Map<String, String> apply(String input) {
>         final HashMap<String, String> hashMap = new HashMap<>();
>         hashMap.put("Some", "Thing");
>         return hashMap;
>     }
> }
> {code}
> and
> {code}
> public class BarString implements SerializableFunction<String, String> {
>     @Override
>     public String apply(String input) {
>         return new StringBuilder(input).reverse().toString();
>     }
> }
> {code}
> My test 
> {code}
> @Category(ValidatesRunner.class)
> public class TestFunctionReturnsMap implements Serializable {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(TestFunctionReturnsMap.class);
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>     @Test
>     @Category(NeedsRunner.class)
>     public void testUserAgentAnalysisSQL() {
>         // ============================================================
>         // Create input PCollection<Row>
>         Schema inputSchema = Schema
>             .builder()
>             .addStringField("bar")
>             .build();
>         PCollection<Row> input = pipeline
>             .apply(Create.of(Arrays.asList("One", "Two", "Three")))
>             .setCoder(StringUtf8Coder.of())
>             .apply(ParDo.of(new DoFn<String, Row>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     c.output(Row
>                         .withSchema(inputSchema)
>                         .addValues(c.element())
>                         .build());
>                 }
>             })).setRowSchema(inputSchema);
>         // ============================================================
>         PCollection<Row> result =
>             // This way we give a name to the input stream for use in the SQL
>             PCollectionTuple.of("InputStream", input)
>                 // Apply the SQL with the UDFs we need.
>                 .apply("Execute SQL", SqlTransform
>                     .query(
>                         "SELECT" +
>                         "   bar             AS bar" +
>                         "  ,Bar(bar)        AS barbar " +
>                         "  ,Foo(bar)        AS foobar " +
>                         "FROM InputStream")
>                     .registerUdf("Foo",     new FooMap())
>                     .registerUdf("Bar",     new BarString())
>                 );
>         result.apply(ParDo.of(new RowPrinter()));
>         pipeline.run().waitUntilFinish();
>     }
>     public static class RowPrinter extends DoFn<Row, Row> {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             final Row row = c.element();
>             LOG.info("ROW: {} --> {}", row, row.getSchema());
>         }
>     }
> }
> {code}
> The Exception I always get
> {code}
> java.lang.NullPointerException: Null type
>       at 
> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
>       at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at java.util.Iterator.forEachRemaining(Iterator.java:116)
>       at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>       at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
>       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
>       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
>       at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
>       at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
>       at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
>       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
>       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
>       at 
> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
>       at 
> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to