Alireza Samadianzakaria created BEAM-7511:
---------------------------------------------

             Summary: KafkaTable Initialization
                 Key: BEAM-7511
                 URL: https://issues.apache.org/jira/browse/BEAM-7511
             Project: Beam
          Issue Type: Bug
          Components: dsl-sql
            Reporter: Alireza Samadianzakaria
            Assignee: Alireza Samadianzakaria


This exception is thrown when a kafka table is created because.

Exception in thread "main" java.lang.NullPointerException
 at 
org.apache.beam.sdk.io.kafka.KafkaIO.updateKafkaProperties(KafkaIO.java:1028)
 at org.apache.beam.sdk.io.kafka.KafkaIO.access$900(KafkaIO.java:251)
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read.withConsumerConfigUpdates(KafkaIO.java:814)
 at 
org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable.buildIOReader(BeamKafkaTable.java:89)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel$Transform.expand(BeamIOSourceRel.java:67)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel$Transform.expand(BeamIOSourceRel.java:58)
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
 at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:66)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
 at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
 at java.util.Iterator.forEachRemaining(Iterator.java:116)
 at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
 at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
 at java.util.Iterator.forEachRemaining(Iterator.java:116)
 at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
 at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36)
 at 
org.apache.beam.sdk.extensions.sql.example.MyKafkaExample.main(MyKafkaExample.java:76)

 

This happens because in org.apache.beam.sdk.extensions.sql.meta.provider.kafka, 
configupdates is not initialized anywhere and the method 
updateConsumerProperties is never called.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to