On 08/06/2018 12:01, Sijie Guo wrote:
Geoffroy,
Thank you for reporting this. The change was made to evolve pulsar into a
type-safe client with schema and more validations. Along with the idea,
we changed the client to become a more fluent style builder and hide the
configuration data as an implementation detail to the pulsar client
implementation.
However we missed the use cases in data computing world, people tends to
use java Serializable on sharing configuration.
We would incorporate your use cases and try to bring the configuration
object back.
- Sijie
That would be great, thanks!
On Fri, Jun 8, 2018 at 1:09 AM Geoffroy Fouquier <
geoffroy.fouqu...@exensa.com> wrote:
We are using pulsar 1.22 within a spark framework and I am currently
upgrading my cluster to pulsar 2.0. One of the main change concerns the
configuration classes, replace by builders. Although ProducerBuilder and
ConsumerBuilder interfaces implement Serializable, corresponding
implementations aren't, because a pulsar client object is embedded. The
schema object is also a problem for serialization.
Basically, with 1.22 my producer app. instanciate a client and a
producer on each executor and only configurations classes need to be
serialized
---8<---
WarcRecordExtractor
.load(sc, input)
.foreachPartition{ ite =>
val client = PulsarClient.create(ServiceUrl, clientConf)
val producer = client.createProducer(topic, producerConf)
ite.foreach{ x => producer.sendAsync(x._2.content) }
producer.close()
client.close()
}
---8<---
With 2.0 I try to replace the configuration classes with the
corresponding builders, but this setting doesn't work since the
producerBuilder isn't serializable :
---8<---
[...]
val client = clientBuilder.serviceUrl(serviceUrl).build()
val producer = producerBuilder.topic(topicName).create
[...]
---8<---
My actual workaround is to rewrite my own ProducerBuilderImpl without
pulsar client or schemas (the createAsync method still exists but throw
an exception). And then to instanciate a producer like this :
---8<---
[...]
val client : PulsarClientImpl =
clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl]
val producer =
client.createProducerAsync(producerBuilder.topic(topic).getConf(),
org.apache.pulsar.client.api.Schema.BYTES).get()
[...]
---8<---
where producerBuilder is my own implementation which implements
ProducerBuilder interface.
I did the same thing for ConsumerBuilderImpl and also rewrite
SparkStreamingPulsarReceiver with the same kind of workaround. I don't
know why the configuration classes have been replaced by builders, but
it became more difficult to use it with spark (and probably with other
distributed framework), but maybe i did it wrong.