Hi Geoffroy,

I created an issue for
https://github.com/apache/incubator-pulsar/issues/1943.

We will address it for the upcoming 2.1 release.

- Sijie

On Fri, Jun 8, 2018 at 6:39 AM Geoffroy Fouquier <
geoffroy.fouqu...@exensa.com> wrote:

> On 08/06/2018 12:01, Sijie Guo wrote:
> > Geoffroy,
> >
> > Thank you for reporting this. The change was made to evolve pulsar into a
> > type-safe client with schema and more validations. Along with the idea,
> > we changed the client to become a more fluent style builder and hide the
> > configuration data as an implementation detail to the pulsar client
> > implementation.
> >
> > However we missed the use cases in data computing world, people tends to
> > use java Serializable on sharing configuration.
> >
> > We would incorporate your use cases and try to bring the configuration
> > object back.
> >
> > - Sijie
>
> That would be great, thanks!
>
> >
> >
> > On Fri, Jun 8, 2018 at 1:09 AM Geoffroy Fouquier <
> > geoffroy.fouqu...@exensa.com> wrote:
> >
> >> We are using pulsar 1.22 within a spark framework and I am currently
> >> upgrading my cluster to pulsar 2.0. One of the main change concerns the
> >> configuration classes, replace by builders. Although ProducerBuilder and
> >> ConsumerBuilder interfaces implement Serializable, corresponding
> >> implementations aren't, because a pulsar client object is embedded. The
> >> schema object is also a problem for serialization.
> >>
> >>
> >> Basically, with 1.22 my producer app. instanciate a client and a
> >> producer on each executor and only configurations classes need to be
> >> serialized
> >>
> >> ---8<---
> >>
> >> WarcRecordExtractor
> >>           .load(sc, input)
> >>           .foreachPartition{ ite =>
> >>
> >>               val client = PulsarClient.create(ServiceUrl, clientConf)
> >>               val producer = client.createProducer(topic, producerConf)
> >>               ite.foreach{ x => producer.sendAsync(x._2.content) }
> >>               producer.close()
> >>
> >>               client.close()
> >>
> >>           }
> >>
> >> ---8<---
> >>
> >>
> >> With 2.0 I try to replace the configuration classes with the
> >> corresponding builders, but this setting doesn't work since the
> >> producerBuilder isn't serializable :
> >>
> >> ---8<---
> >>
> >>               [...]
> >>
> >>               val client = clientBuilder.serviceUrl(serviceUrl).build()
> >>               val producer = producerBuilder.topic(topicName).create
> >>
> >>               [...]
> >>
> >> ---8<---
> >>
> >>
> >> My actual workaround is to rewrite my own ProducerBuilderImpl without
> >> pulsar client or schemas (the createAsync method still exists but throw
> >> an exception). And then to instanciate a producer like this :
> >>
> >> ---8<---
> >>
> >>               [...]
> >>
> >>               val client : PulsarClientImpl =
> >> clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl]
> >>               val producer =
> >> client.createProducerAsync(producerBuilder.topic(topic).getConf(),
> >> org.apache.pulsar.client.api.Schema.BYTES).get()
> >>
> >>               [...]
> >>
> >> ---8<---
> >>
> >> where producerBuilder is my own implementation which implements
> >> ProducerBuilder interface.
> >>
> >>
> >> I did the same thing for ConsumerBuilderImpl and also rewrite
> >> SparkStreamingPulsarReceiver with the same kind of workaround. I don't
> >> know why the configuration classes have been replaced by builders, but
> >> it became more difficult to use it with spark (and probably with other
> >> distributed framework), but maybe i did it wrong.
> >>
> >>
> >>
> >>
>
>

Reply via email to