[
https://issues.apache.org/jira/browse/BEAM-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alireza Samadianzakaria resolved BEAM-7511.
-------------------------------------------
Resolution: Fixed
Fix Version/s: 2.14.0
> KafkaTable Initialization
> -------------------------
>
> Key: BEAM-7511
> URL: https://issues.apache.org/jira/browse/BEAM-7511
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Reporter: Alireza Samadianzakaria
> Assignee: Alireza Samadianzakaria
> Priority: Major
> Fix For: 2.14.0
>
> Time Spent: 2h
> Remaining Estimate: 0h
>
> This exception is thrown when a kafka table is created because.
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.beam.sdk.io.kafka.KafkaIO.updateKafkaProperties(KafkaIO.java:1028)
> at org.apache.beam.sdk.io.kafka.KafkaIO.access$900(KafkaIO.java:251)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.withConsumerConfigUpdates(KafkaIO.java:814)
> at
> org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable.buildIOReader(BeamKafkaTable.java:89)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel$Transform.expand(BeamIOSourceRel.java:67)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel$Transform.expand(BeamIOSourceRel.java:58)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:66)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36)
> at
> org.apache.beam.sdk.extensions.sql.example.MyKafkaExample.main(MyKafkaExample.java:76)
>
> This happens because in
> org.apache.beam.sdk.extensions.sql.meta.provider.kafka, configupdates is not
> initialized anywhere and the method updateConsumerProperties is never called.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)