dorg-prajagopal opened a new issue, #22538:
URL: https://github.com/apache/beam/issues/22538

   My topic has 2 partitions and I would like to have pipeline that uses the 
kafkaIo.read to not process message from partition 0. 
   
   trying to use the 
[withCheckStopReadingFn](https://beam.apache.org/releases/javadoc/2.39.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withCheckStopReadingFn-org.apache.beam.sdk.transforms.SerializableFunction-)
 Sample code is
   
   `public class CheckPartitionStatus implements 
SerializableFunction<TopicPartition, Boolean> {
       @Override
       public Boolean apply(TopicPartition input) {
           boolean value = false;
           if (input.equals(new TopicPartition(topicName, 0))) {
               value = true;
           }
           return value;
       }
   }`
   pipeline is as follows
   
   `PTransform<PBegin, PCollection<KV<GenericData.Record, GenericData.Record>>> 
kafka = KafkaIO.<GenericData.Record, GenericData.Record>read()
               .withBootstrapServers(brokerurl)
               .withTopic(inputPersonTopic)
               .withConsumerConfigUpdates(props)
               .withKeyDeserializer(ConfluentSchemaRegistryDeserializerProvider
                       .of(url,schemakey, null, csrConfig))
               
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider
                       .of(url,schemavalue, null, csrConfig))
               .withReadCommitted()
               .commitOffsetsInFinalize()
               .withCheckStopReadingFn(new CheckPartitionStatus())
               .withoutMetadata();
   
   
           pipeline.apply(kafka)
                   .apply(Values.<GenericData.Record>create())
                   .apply("ProcessMessage", ParDo.of(new ProcessMessage()));
           pipeline.run();`
   
   
   My topic has 2 partitions and my flow is to have the dataflow stop consuming 
message. if the message was from partition 0.
   
   trying to use the 
[withCheckStopReadingFn](https://beam.apache.org/releases/javadoc/2.39.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withCheckStopReadingFn-org.apache.beam.sdk.transforms.SerializableFunction-)
 Sample code is
   
   public class CheckPartitionStatus implements 
SerializableFunction<TopicPartition, Boolean> {
       @Override
       public Boolean apply(TopicPartition input) {
           boolean value = false;
           if (input.equals(new TopicPartition(topicName, 0))) {
               value = true;
           }
           return value;
       }
   }
   pipeline is as follows
   
   PTransform<PBegin, PCollection<KV<GenericData.Record, GenericData.Record>>> 
kafka = KafkaIO.<GenericData.Record, GenericData.Record>read()
               .withBootstrapServers(brokerurl)
               .withTopic(inputPersonTopic)
               .withConsumerConfigUpdates(props)
               .withKeyDeserializer(ConfluentSchemaRegistryDeserializerProvider
                       .of(url,schemakey, null, csrConfig))
               
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider
                       .of(url,schemavalue, null, csrConfig))
               .withReadCommitted()
               .commitOffsetsInFinalize()
               .withCheckStopReadingFn(new CheckPartitionStatus())
               .withoutMetadata();
   
   
           pipeline.apply(kafka)
                   .apply(Values.<GenericData.Record>create())
                   .apply("ProcessMessage", ParDo.of(new ProcessMessage()));
           pipeline.run().waitUntilFinish();
   running into exception
   
   Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalStateException: Last attempted offset should not be null. No 
work was claimed in non-empty range [7, 9223372036854775807).
   
   as per the topic, the last message offset is at 7. Any help on how I can use 
this function?
   
   Is there any issue with using this Serialized function. I don't have issues 
connecting to redis. Commenting out withCheckStopReadingFn seems to start the 
pipeline fine and process messages from kafka
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to