We are using pulsar 1.22 within a spark framework and I am currently
upgrading my cluster to pulsar 2.0. One of the main change concerns the
configuration classes, replace by builders. Although ProducerBuilder and
ConsumerBuilder interfaces implement Serializable, corresponding
implementations aren't, because a pulsar client object is embedded. The
schema object is also a problem for serialization.
Basically, with 1.22 my producer app. instanciate a client and a
producer on each executor and only configurations classes need to be
serialized
---8<---
WarcRecordExtractor
.load(sc, input)
.foreachPartition{ ite =>
val client = PulsarClient.create(ServiceUrl, clientConf)
val producer = client.createProducer(topic, producerConf)
ite.foreach{ x => producer.sendAsync(x._2.content) }
producer.close()
client.close()
}
---8<---
With 2.0 I try to replace the configuration classes with the
corresponding builders, but this setting doesn't work since the
producerBuilder isn't serializable :
---8<---
[...]
val client = clientBuilder.serviceUrl(serviceUrl).build()
val producer = producerBuilder.topic(topicName).create
[...]
---8<---
My actual workaround is to rewrite my own ProducerBuilderImpl without
pulsar client or schemas (the createAsync method still exists but throw
an exception). And then to instanciate a producer like this :
---8<---
[...]
val client : PulsarClientImpl =
clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl]
val producer =
client.createProducerAsync(producerBuilder.topic(topic).getConf(),
org.apache.pulsar.client.api.Schema.BYTES).get()
[...]
---8<---
where producerBuilder is my own implementation which implements
ProducerBuilder interface.
I did the same thing for ConsumerBuilderImpl and also rewrite
SparkStreamingPulsarReceiver with the same kind of workaround. I don't
know why the configuration classes have been replaced by builders, but
it became more difficult to use it with spark (and probably with other
distributed framework), but maybe i did it wrong.