Dear team,

I am currently using Apache Beam to read messages from Kafka with a
Protobuf schema and a schema registry URL, but I am still encountering
errors. Could you please help check my use of the Beam SDK for this case?
Below attachment is my code.
package org.example;

import com.google.protobuf.Message;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
//import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.example.Apenpup;
import java.util.HashMap;
import java.util.Map;

public class KafkaPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPipeline.class);

    public static void main(String[] args) {
        String schemaRegistryUrl = "http://localhost:8081";;
        String subject = "my-topic";
        String boostrapServer = "xxxx:9092";
        String topic = "xxxx";
        String GROUP_ID = "my-group-id";
        // Create a Pipeline object
        Pipeline pipeline = Pipeline.create();

        // Create the deserializer provider
        ConfluentSchemaRegistryDeserializerProvider<Apenpup> deserializerProvider =
                ConfluentSchemaRegistryDeserializerProvider.of(schemaRegistryUrl, subject,5);

        // Create Kafka consumer configurations
        Map<String, Object> kafkaConfigs = new HashMap<>();

//        // Get the deserializer
        org.apache.kafka.common.serialization.Deserializer<Apenpup> deserializer =
                deserializerProvider.getDeserializer(kafkaConfigs, true );
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", boostrapServer);
        consumerConfig.put("group.id", GROUP_ID);
        consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
        consumerConfig.put("value.deserializer", KafkaProtobufDeserializer.class.getName());
        consumerConfig.put("specific.protobuf.value.type", Apenpup.class.getName());

        // Create an instance of KafkaProtobufDeserializer and configure it
//        KafkaProtobufDeserializer<Apenpup> valueDeserializer = new KafkaProtobufDeserializer<>();
//        valueDeserializer.configure(consumerConfig, false);
        // Read from Kafka
        PCollection<Apenpup> messages = pipeline.apply("ReadFromKafka", KafkaIO.<String, Apenpup>read()
                        .withBootstrapServers(boostrapServer)
                        .withTopic(topic)
                        .withKeyDeserializer(StringDeserializer.class)
                        .withValueDeserializer((DeserializerProvider<Apenpup>) deserializer)
                        .updateConsumerProperties(consumerConfig)
                        .withoutMetadata())
                .apply("ExtractValues", ParDo.of(new DoFn<KV<String, Apenpup>, Apenpup>() {
                    @ProcessElement
                    public void processElement(@Element KV<String, Apenpup> record, OutputReceiver<Apenpup> out) {
                        Apenpup value = record.getValue();
                        LOG.info("Received message: " + value);
                        out.output(value);
                    }
                }));

        // Apply windowing
        PCollection<Apenpup> windowedMessages = messages.apply("WindowIntoFixedIntervals",
                Window.into(FixedWindows.of(Duration.standardMinutes(1))));

        // Apply a simple transformation
        PCollection<String> transformedMessages = windowedMessages.apply("TransformMessages", ParDo.of(new DoFn<Apenpup, String>() {
            @ProcessElement
            public void processElement(@Element Apenpup message, OutputReceiver<String> out) {
                String transformedMessage = message.toString().toUpperCase();
                LOG.info("Transformed message: " + transformedMessage);
                out.output(transformedMessage);
            }
        }));

        // Write the results to text files
        transformedMessages.apply("WriteToText", TextIO.write()
                .to("/home/thuybui1/apachebeampipeline/output/output")
                .withWindowedWrites()
                .withNumShards(1)
                .withSuffix(".txt"));

        // Run the pipeline
        pipeline.run().waitUntilFinish();
    }
}

Reply via email to