Geoffroy, Thank you for reporting this. The change was made to evolve pulsar into a type-safe client with schema and more validations. Along with the idea, we changed the client to become a more fluent style builder and hide the configuration data as an implementation detail to the pulsar client implementation.
However we missed the use cases in data computing world, people tends to use java Serializable on sharing configuration. We would incorporate your use cases and try to bring the configuration object back. - Sijie On Fri, Jun 8, 2018 at 1:09 AM Geoffroy Fouquier < geoffroy.fouqu...@exensa.com> wrote: > > We are using pulsar 1.22 within a spark framework and I am currently > upgrading my cluster to pulsar 2.0. One of the main change concerns the > configuration classes, replace by builders. Although ProducerBuilder and > ConsumerBuilder interfaces implement Serializable, corresponding > implementations aren't, because a pulsar client object is embedded. The > schema object is also a problem for serialization. > > > Basically, with 1.22 my producer app. instanciate a client and a > producer on each executor and only configurations classes need to be > serialized > > ---8<--- > > WarcRecordExtractor > .load(sc, input) > .foreachPartition{ ite => > > val client = PulsarClient.create(ServiceUrl, clientConf) > val producer = client.createProducer(topic, producerConf) > ite.foreach{ x => producer.sendAsync(x._2.content) } > producer.close() > > client.close() > > } > > ---8<--- > > > With 2.0 I try to replace the configuration classes with the > corresponding builders, but this setting doesn't work since the > producerBuilder isn't serializable : > > ---8<--- > > [...] > > val client = clientBuilder.serviceUrl(serviceUrl).build() > val producer = producerBuilder.topic(topicName).create > > [...] > > ---8<--- > > > My actual workaround is to rewrite my own ProducerBuilderImpl without > pulsar client or schemas (the createAsync method still exists but throw > an exception). And then to instanciate a producer like this : > > ---8<--- > > [...] > > val client : PulsarClientImpl = > clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl] > val producer = > client.createProducerAsync(producerBuilder.topic(topic).getConf(), > org.apache.pulsar.client.api.Schema.BYTES).get() > > [...] > > ---8<--- > > where producerBuilder is my own implementation which implements > ProducerBuilder interface. > > > I did the same thing for ConsumerBuilderImpl and also rewrite > SparkStreamingPulsarReceiver with the same kind of workaround. I don't > know why the configuration classes have been replaced by builders, but > it became more difficult to use it with spark (and probably with other > distributed framework), but maybe i did it wrong. > > > >