Hi Geoffroy, I created an issue for https://github.com/apache/incubator-pulsar/issues/1943.
We will address it for the upcoming 2.1 release. - Sijie On Fri, Jun 8, 2018 at 6:39 AM Geoffroy Fouquier < geoffroy.fouqu...@exensa.com> wrote: > On 08/06/2018 12:01, Sijie Guo wrote: > > Geoffroy, > > > > Thank you for reporting this. The change was made to evolve pulsar into a > > type-safe client with schema and more validations. Along with the idea, > > we changed the client to become a more fluent style builder and hide the > > configuration data as an implementation detail to the pulsar client > > implementation. > > > > However we missed the use cases in data computing world, people tends to > > use java Serializable on sharing configuration. > > > > We would incorporate your use cases and try to bring the configuration > > object back. > > > > - Sijie > > That would be great, thanks! > > > > > > > On Fri, Jun 8, 2018 at 1:09 AM Geoffroy Fouquier < > > geoffroy.fouqu...@exensa.com> wrote: > > > >> We are using pulsar 1.22 within a spark framework and I am currently > >> upgrading my cluster to pulsar 2.0. One of the main change concerns the > >> configuration classes, replace by builders. Although ProducerBuilder and > >> ConsumerBuilder interfaces implement Serializable, corresponding > >> implementations aren't, because a pulsar client object is embedded. The > >> schema object is also a problem for serialization. > >> > >> > >> Basically, with 1.22 my producer app. instanciate a client and a > >> producer on each executor and only configurations classes need to be > >> serialized > >> > >> ---8<--- > >> > >> WarcRecordExtractor > >> .load(sc, input) > >> .foreachPartition{ ite => > >> > >> val client = PulsarClient.create(ServiceUrl, clientConf) > >> val producer = client.createProducer(topic, producerConf) > >> ite.foreach{ x => producer.sendAsync(x._2.content) } > >> producer.close() > >> > >> client.close() > >> > >> } > >> > >> ---8<--- > >> > >> > >> With 2.0 I try to replace the configuration classes with the > >> corresponding builders, but this setting doesn't work since the > >> producerBuilder isn't serializable : > >> > >> ---8<--- > >> > >> [...] > >> > >> val client = clientBuilder.serviceUrl(serviceUrl).build() > >> val producer = producerBuilder.topic(topicName).create > >> > >> [...] > >> > >> ---8<--- > >> > >> > >> My actual workaround is to rewrite my own ProducerBuilderImpl without > >> pulsar client or schemas (the createAsync method still exists but throw > >> an exception). And then to instanciate a producer like this : > >> > >> ---8<--- > >> > >> [...] > >> > >> val client : PulsarClientImpl = > >> clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl] > >> val producer = > >> client.createProducerAsync(producerBuilder.topic(topic).getConf(), > >> org.apache.pulsar.client.api.Schema.BYTES).get() > >> > >> [...] > >> > >> ---8<--- > >> > >> where producerBuilder is my own implementation which implements > >> ProducerBuilder interface. > >> > >> > >> I did the same thing for ConsumerBuilderImpl and also rewrite > >> SparkStreamingPulsarReceiver with the same kind of workaround. I don't > >> know why the configuration classes have been replaced by builders, but > >> it became more difficult to use it with spark (and probably with other > >> distributed framework), but maybe i did it wrong. > >> > >> > >> > >> > >