Dear team, I am currently using Apache Beam to read messages from Kafka with a Protobuf schema and a schema registry URL, but I am still encountering errors. Could you please help check my use of the Beam SDK for this case? Below attachment is my code.
package org.example;
import com.google.protobuf.Message; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.kafka.DeserializerProvider; import org.apache.beam.sdk.io.kafka.KafkaIO; //import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.example.Apenpup; import java.util.HashMap; import java.util.Map; public class KafkaPipeline { private static final Logger LOG = LoggerFactory.getLogger(KafkaPipeline.class); public static void main(String[] args) { String schemaRegistryUrl = "http://localhost:8081"; String subject = "my-topic"; String boostrapServer = "xxxx:9092"; String topic = "xxxx"; String GROUP_ID = "my-group-id"; // Create a Pipeline object Pipeline pipeline = Pipeline.create(); // Create the deserializer provider ConfluentSchemaRegistryDeserializerProvider<Apenpup> deserializerProvider = ConfluentSchemaRegistryDeserializerProvider.of(schemaRegistryUrl, subject,5); // Create Kafka consumer configurations Map<String, Object> kafkaConfigs = new HashMap<>(); // // Get the deserializer org.apache.kafka.common.serialization.Deserializer<Apenpup> deserializer = deserializerProvider.getDeserializer(kafkaConfigs, true ); Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put("bootstrap.servers", boostrapServer); consumerConfig.put("group.id", GROUP_ID); consumerConfig.put("key.deserializer", StringDeserializer.class.getName()); consumerConfig.put("value.deserializer", KafkaProtobufDeserializer.class.getName()); consumerConfig.put("specific.protobuf.value.type", Apenpup.class.getName()); // Create an instance of KafkaProtobufDeserializer and configure it // KafkaProtobufDeserializer<Apenpup> valueDeserializer = new KafkaProtobufDeserializer<>(); // valueDeserializer.configure(consumerConfig, false); // Read from Kafka PCollection<Apenpup> messages = pipeline.apply("ReadFromKafka", KafkaIO.<String, Apenpup>read() .withBootstrapServers(boostrapServer) .withTopic(topic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer((DeserializerProvider<Apenpup>) deserializer) .updateConsumerProperties(consumerConfig) .withoutMetadata()) .apply("ExtractValues", ParDo.of(new DoFn<KV<String, Apenpup>, Apenpup>() { @ProcessElement public void processElement(@Element KV<String, Apenpup> record, OutputReceiver<Apenpup> out) { Apenpup value = record.getValue(); LOG.info("Received message: " + value); out.output(value); } })); // Apply windowing PCollection<Apenpup> windowedMessages = messages.apply("WindowIntoFixedIntervals", Window.into(FixedWindows.of(Duration.standardMinutes(1)))); // Apply a simple transformation PCollection<String> transformedMessages = windowedMessages.apply("TransformMessages", ParDo.of(new DoFn<Apenpup, String>() { @ProcessElement public void processElement(@Element Apenpup message, OutputReceiver<String> out) { String transformedMessage = message.toString().toUpperCase(); LOG.info("Transformed message: " + transformedMessage); out.output(transformedMessage); } })); // Write the results to text files transformedMessages.apply("WriteToText", TextIO.write() .to("/home/thuybui1/apachebeampipeline/output/output") .withWindowedWrites() .withNumShards(1) .withSuffix(".txt")); // Run the pipeline pipeline.run().waitUntilFinish(); } }