rthneha opened a new issue, #30688:
URL: https://github.com/apache/beam/issues/30688

   ### What happened?
   
   I have configured PulsarIO plugin via Beam to read messages from Pulsar as 
below:
   
   ```
   PCollection<PulsarMessage> pCollectionAll = p.apply("ReadPulsarMessage", 
PulsarIO
                   .read()
                   .withAdminUrl(options.getPulsarAdminURL())
                   .withClientUrl(options.getPulsarClientURL())
                   .withTopic(options.getPulsarTopic())); 
   ```
   
   I can see PulsarSourceDescriptor has 3 mandatory things so I set those up. 
But I am not able to read messages & getting below error:
   ```
   
   Error message from worker: java.io.IOException: Failed to start reading from 
source: 
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@53cad662
           
org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:821)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
           
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
           
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: Could not find a way to create 
AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor
           
org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator(AutoValueSchema.java:133)
           
org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
           
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
           
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:63)
           
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:43)
           org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126)
           org.apache.beam.sdk.coders.Coder.decode(Coder.java:154)
           
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:142)
           
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:102)
           
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:96)
           
org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:560)
           
org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:542)
           
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
           
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:474)
           
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:452)
           
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:304)
           
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:297)
           
org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
           
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
           
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           java.lang.Thread.run(Thread.java:750)
   
   ```
   
   I saw other way of consuming messages using PulsarClient & it uses JWT token:
   ```
   
   import java.net.URL;
   import org.apache.pulsar.client.api.*;
   
   public class SNConsumer {
   
       public static void main(String[] args) throws Exception
       {
   
           PulsarClient client = PulsarClient.builder()
               .serviceUrl("pulsarClientUrl")
               .authentication(
                   AuthenticationFactory.token("<JWT Token>")
               )
               .build();
   ```
   
   I have this JWT token, but not able to set it up in PulsarClient due to 
SerializableFunction used. Can someone help:
   
   ```
   
   import org.apache.pulsar.client.api.AuthenticationFactory;
   
    public Read withPulsarClient(SerializableFunction<String, PulsarClient> 
pulsarClientFn) {
         //return builder().setPulsarClient(pulsarClientFn).build();
         
         PulsarClient client = PulsarClient.builder()
         .serviceUrl("")
         .authentication(
             AuthenticationFactory.token("<JWT Token>")
         )
         .build();
         return client;
       }
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to