We are using pulsar 1.22 within a spark framework and I am currently upgrading my cluster to pulsar 2.0. One of the main change concerns the configuration classes, replace by builders. Although ProducerBuilder and ConsumerBuilder interfaces implement Serializable, corresponding implementations aren't, because a pulsar client object is embedded. The schema object is also a problem for serialization.


Basically, with 1.22 my producer app. instanciate a client and a producer on each executor and only configurations classes need to be serialized

---8<---

WarcRecordExtractor
        .load(sc, input)
        .foreachPartition{ ite =>

            val client = PulsarClient.create(ServiceUrl, clientConf)
            val producer = client.createProducer(topic, producerConf)
            ite.foreach{ x => producer.sendAsync(x._2.content) }
            producer.close()

            client.close()

        }

---8<---


With 2.0 I try to replace the configuration classes with the corresponding builders, but this setting doesn't work since the producerBuilder isn't serializable :

---8<---

            [...]

            val client = clientBuilder.serviceUrl(serviceUrl).build()
            val producer = producerBuilder.topic(topicName).create

            [...]

---8<---


My actual workaround is to rewrite my own ProducerBuilderImpl without  pulsar client or schemas (the createAsync method still exists but throw an exception). And then to instanciate a producer like this :

---8<---

            [...]

            val client : PulsarClientImpl = clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl]             val producer = client.createProducerAsync(producerBuilder.topic(topic).getConf(), org.apache.pulsar.client.api.Schema.BYTES).get()

            [...]

---8<---

where producerBuilder is my own implementation which implements ProducerBuilder interface.


I did the same thing for ConsumerBuilderImpl and also rewrite SparkStreamingPulsarReceiver with the same kind of workaround. I don't know why the configuration classes have been replaced by builders, but it became more difficult to use it with spark (and probably with other distributed framework), but maybe i did it wrong.



Reply via email to