dorg-prajagopal opened a new issue, #22538: URL: https://github.com/apache/beam/issues/22538
My topic has 2 partitions and I would like to have pipeline that uses the kafkaIo.read to not process message from partition 0. trying to use the [withCheckStopReadingFn](https://beam.apache.org/releases/javadoc/2.39.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withCheckStopReadingFn-org.apache.beam.sdk.transforms.SerializableFunction-) Sample code is `public class CheckPartitionStatus implements SerializableFunction<TopicPartition, Boolean> { @Override public Boolean apply(TopicPartition input) { boolean value = false; if (input.equals(new TopicPartition(topicName, 0))) { value = true; } return value; } }` pipeline is as follows `PTransform<PBegin, PCollection<KV<GenericData.Record, GenericData.Record>>> kafka = KafkaIO.<GenericData.Record, GenericData.Record>read() .withBootstrapServers(brokerurl) .withTopic(inputPersonTopic) .withConsumerConfigUpdates(props) .withKeyDeserializer(ConfluentSchemaRegistryDeserializerProvider .of(url,schemakey, null, csrConfig)) .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider .of(url,schemavalue, null, csrConfig)) .withReadCommitted() .commitOffsetsInFinalize() .withCheckStopReadingFn(new CheckPartitionStatus()) .withoutMetadata(); pipeline.apply(kafka) .apply(Values.<GenericData.Record>create()) .apply("ProcessMessage", ParDo.of(new ProcessMessage())); pipeline.run();` My topic has 2 partitions and my flow is to have the dataflow stop consuming message. if the message was from partition 0. trying to use the [withCheckStopReadingFn](https://beam.apache.org/releases/javadoc/2.39.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withCheckStopReadingFn-org.apache.beam.sdk.transforms.SerializableFunction-) Sample code is public class CheckPartitionStatus implements SerializableFunction<TopicPartition, Boolean> { @Override public Boolean apply(TopicPartition input) { boolean value = false; if (input.equals(new TopicPartition(topicName, 0))) { value = true; } return value; } } pipeline is as follows PTransform<PBegin, PCollection<KV<GenericData.Record, GenericData.Record>>> kafka = KafkaIO.<GenericData.Record, GenericData.Record>read() .withBootstrapServers(brokerurl) .withTopic(inputPersonTopic) .withConsumerConfigUpdates(props) .withKeyDeserializer(ConfluentSchemaRegistryDeserializerProvider .of(url,schemakey, null, csrConfig)) .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider .of(url,schemavalue, null, csrConfig)) .withReadCommitted() .commitOffsetsInFinalize() .withCheckStopReadingFn(new CheckPartitionStatus()) .withoutMetadata(); pipeline.apply(kafka) .apply(Values.<GenericData.Record>create()) .apply("ProcessMessage", ParDo.of(new ProcessMessage())); pipeline.run().waitUntilFinish(); running into exception Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Last attempted offset should not be null. No work was claimed in non-empty range [7, 9223372036854775807). as per the topic, the last message offset is at 7. Any help on how I can use this function? Is there any issue with using this Serialized function. I don't have issues connecting to redis. Commenting out withCheckStopReadingFn seems to start the pipeline fine and process messages from kafka -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
