Hi Geoffroy, I put up a PR https://github.com/apache/incubator-pulsar/pull/2004 for addressing the issue. If you can take a look and see if that approach addresses your problem.
- Sijie On Fri, Jun 8, 2018 at 8:32 AM Sijie Guo <guosi...@gmail.com> wrote: > Hi Geoffroy, > > I created an issue for > https://github.com/apache/incubator-pulsar/issues/1943. > > We will address it for the upcoming 2.1 release. > > - Sijie > > On Fri, Jun 8, 2018 at 6:39 AM Geoffroy Fouquier < > geoffroy.fouqu...@exensa.com> wrote: > >> On 08/06/2018 12:01, Sijie Guo wrote: >> > Geoffroy, >> > >> > Thank you for reporting this. The change was made to evolve pulsar into >> a >> > type-safe client with schema and more validations. Along with the idea, >> > we changed the client to become a more fluent style builder and hide the >> > configuration data as an implementation detail to the pulsar client >> > implementation. >> > >> > However we missed the use cases in data computing world, people tends to >> > use java Serializable on sharing configuration. >> > >> > We would incorporate your use cases and try to bring the configuration >> > object back. >> > >> > - Sijie >> >> That would be great, thanks! >> >> > >> > >> > On Fri, Jun 8, 2018 at 1:09 AM Geoffroy Fouquier < >> > geoffroy.fouqu...@exensa.com> wrote: >> > >> >> We are using pulsar 1.22 within a spark framework and I am currently >> >> upgrading my cluster to pulsar 2.0. One of the main change concerns the >> >> configuration classes, replace by builders. Although ProducerBuilder >> and >> >> ConsumerBuilder interfaces implement Serializable, corresponding >> >> implementations aren't, because a pulsar client object is embedded. The >> >> schema object is also a problem for serialization. >> >> >> >> >> >> Basically, with 1.22 my producer app. instanciate a client and a >> >> producer on each executor and only configurations classes need to be >> >> serialized >> >> >> >> ---8<--- >> >> >> >> WarcRecordExtractor >> >> .load(sc, input) >> >> .foreachPartition{ ite => >> >> >> >> val client = PulsarClient.create(ServiceUrl, clientConf) >> >> val producer = client.createProducer(topic, producerConf) >> >> ite.foreach{ x => producer.sendAsync(x._2.content) } >> >> producer.close() >> >> >> >> client.close() >> >> >> >> } >> >> >> >> ---8<--- >> >> >> >> >> >> With 2.0 I try to replace the configuration classes with the >> >> corresponding builders, but this setting doesn't work since the >> >> producerBuilder isn't serializable : >> >> >> >> ---8<--- >> >> >> >> [...] >> >> >> >> val client = clientBuilder.serviceUrl(serviceUrl).build() >> >> val producer = producerBuilder.topic(topicName).create >> >> >> >> [...] >> >> >> >> ---8<--- >> >> >> >> >> >> My actual workaround is to rewrite my own ProducerBuilderImpl without >> >> pulsar client or schemas (the createAsync method still exists but throw >> >> an exception). And then to instanciate a producer like this : >> >> >> >> ---8<--- >> >> >> >> [...] >> >> >> >> val client : PulsarClientImpl = >> >> clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl] >> >> val producer = >> >> client.createProducerAsync(producerBuilder.topic(topic).getConf(), >> >> org.apache.pulsar.client.api.Schema.BYTES).get() >> >> >> >> [...] >> >> >> >> ---8<--- >> >> >> >> where producerBuilder is my own implementation which implements >> >> ProducerBuilder interface. >> >> >> >> >> >> I did the same thing for ConsumerBuilderImpl and also rewrite >> >> SparkStreamingPulsarReceiver with the same kind of workaround. I don't >> >> know why the configuration classes have been replaced by builders, but >> >> it became more difficult to use it with spark (and probably with other >> >> distributed framework), but maybe i did it wrong. >> >> >> >> >> >> >> >> >> >>