Glad to hear it!

I'll add a JIRA task to always log the exception. We shouldn't need to
enable debug logging to fix issues like this.

On Tue, May 10, 2016 at 10:16 AM, Tim Washington <
tim.washing...@fundingcircle.com> wrote:

> Hey Jacob, right you are.
>
> I added a "*<logger name="org.apache.samza.container" level="DEBUG"/>*"
> logback configuration. And lo and behold, this exception detail popped up.
>
> 10:05:47.520 [main] DEBUG o.a.samza.container.SamzaContainer$ - Exception
> detail:
> java.lang.RuntimeException: Could not instantiate class
> *org.apache.samza.system.elasticsearch.clientTransport.ClientFactory*
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getClient(ElasticsearchSystemFactory.java:79)
> at
>
> org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory.getProducer(ElasticsearchSystemFactory.java:54)
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$13.apply(SamzaContainer.scala:195)
> ...
> at clojure.lang.Var.invoke(Var.java:383)
> at clojure.lang.AFn.applyToHelper(AFn.java:156)
> at clojure.lang.Var.applyTo(Var.java:700)
> at clojure.main.main(main.java:37)
>
>
>
> I fixed the typo (in bold red), and that solved the problem :)
>
> Cheers
>
> Tim
>
>
> On 10 May 2016 at 09:08, Jacob Maes <jacob.m...@gmail.com> wrote:
>
> > Hey Tim,
> >
> > Can you include the full log? Since the producer is not showing up, I'd
> > expect to see some earlier messages explaining why. In particular, this
> > section of code from SamzaContainer.scala should print errors
> instantiating
> > the producers:
> >
> >     val producers = systemFactories
> > >       .map {
> > >         case (systemName, systemFactory) =>
> > >           try {
> > >             (systemName, systemFactory.getProducer(systemName, config,
> > > samzaContainerMetrics.registry))
> > >           } catch {
> > >             case e: Exception =>
> > >               info("Failed to create a producer for %s, so skipping."
> > > format systemName)
> > >               debug("Exception detail:", e)
> > >               (systemName, null)
> > >           }
> > >       }
> > >       .filter(_._2 != null)
> > >       .toMap
> > >     info("Got system producers: %s" format producers.keys)
> >
> >
> > And from the looks of it, it would be a good idea to enable debug logging
> > at least for org.apache.samza.container, so we can see the exception
> > detail.
> >
> > Thanks,
> > Jake
> >
> > On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
> > tim.washing...@fundingcircle.com> wrote:
> >
> > > *Tim Washington* | *Senior Software Engineer*
> > >
> > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
> > >
> > > 747 Front St, 4th Fl | San Francisco, CA 94111
> > >
> > > *Our Mission: **T**o build a better financial world*
> > >
> > >
> > >
> > >
> > > ---------- Forwarded message ----------
> > > From: Tim Washington <tim.washing...@fundingcircle.com>
> > > Date: 9 May 2016 at 15:40
> > > Subject: Howto Use The Elasticsearch Producer
> > > To: dev-subscr...@samza.apache.org
> > > Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese <
> > > charles.re...@fundingcircle.com>
> > >
> > >
> > > Hey guys,
> > >
> > > I'm trying to use the Elasticsearch Producer, as described in this
> > message
> > > <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an
> > > exception,
> > > trying to write out the message.
> > >
> > > 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> > > Entering run loop.
> > > 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> > > SystemStreamPartition [kafka, ledger, 0] is catched up.
> > > 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> > > #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> > > org.apache.samza.container.TaskInstance$$anon$1@20904646]
> > > 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> > > org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> > > :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> > > org.apache.samza.system.SystemStreamPartition, :partition {:class
> > > org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> > > kafka, :system-stream {:class org.apache.samza.system.SystemStream,
> > :stream
> > > ledger, :system kafka}}}
> > > 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer -
> > Caught
> > > exception in process loop.
> > > *org.apache.samza.SamzaException: Attempting to produce to unknown
> > system:
> > > elasticsearch. Available systems: Set(kafka). Please add the system to
> > your
> > > configuration, or update outgoing message envelope to send to a defined
> > > system.*
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > > at
> > >
> > >
> >
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> > > ...
> > > at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> > > at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> > >
> > >
> > > But the source of
> > > <
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86
> > > >
> > > the stack trace line, tells me that the code can't find a *producer* in
> > the
> > > elasticsearch system. The code that I'm using is straight out of the
> > > example.
> > >
> > > collector.send(new OutgoingMessageEnvelope(new
> > > SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
> > > ELASTICSEARCH_TYPE), parsedJsonObject));
> > >
> > >
> > > And if we print out the config, I'm indeed seeing *elasticsearch* as a
> > > system.
> > >
> > >
> > > 14:09:19.978 [main] INFO  ledger.run - ;;
> > > ;;  ___  __ _ _ __ ___  ______ _
> > > ;; / __|/ _` | '_ ` _ \|_  / _` |
> > > ;; \__ \ (_| | | | | | |/ / (_| |
> > > ;; |___/\__,_|_| |_| |_/___\__,_|
> > > ;;
> > >
> > > 14:09:19.980 [main] INFO  ledger.run - ;; >>>>>>>
> > ledger-elastic-connector
> > > <<<<<<<
> > > 14:09:19.980 [main] INFO  ledger.run - ;;
> > > 14:09:19.981 [main] INFO  ledger.run - ;;
> > > job.coordinator.replication.factor=1
> > > 14:09:19.981 [main] INFO  ledger.run - ;; job.coordinator.system=kafka
> > > 14:09:19.981 [main] INFO  ledger.run - ;;
> > > job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> > > 14:09:19.981 [main] INFO  ledger.run - ;; job.name
> > > =ledger-elastic-connector
> > > 14:09:19.981 [main] INFO  ledger.run - ;;
> > > samja.task.factory.function=ledger.elastic.job/elastic-connector
> > > 14:09:19.981 [main] INFO  ledger.run - ;;
> > > serializers.registry.edn.class=samja.core.EDNSerdeFactory
> > > 14:09:19.981 [main] INFO  ledger.run - ;;
> > > systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > >
> > >
> >
> systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > > systems.elasticsearch.client.transport.host=localhost
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > > systems.elasticsearch.client.transport.port=9300
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > >
> > >
> >
> systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > >
> > >
> >
> systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > > systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > > systems.kafka.producer.bootstrap.servers=192.168.99.100:9092
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > >
> > >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > systems.kafka.samza.key.serde=edn
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > systems.kafka.samza.msg.serde=edn
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > >
> > >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > > task.checkpoint.replication.factor=1
> > > 14:09:19.982 [main] INFO  ledger.run - ;; task.checkpoint.system=kafka
> > > 14:09:19.982 [main] INFO  ledger.run - ;;
> > task.class=samja.task.SimpleTask
> > > 14:09:19.983 [main] INFO  ledger.run - ;;
> > >
> > >
> >
> task.inputs=kafka.ledger,kafka.cash-movement-performed,kafka.ledger-entry-added
> > >
> > >
> > > Is there any other setup, or switches I need to make, to be able to
> > > successfully write a message out to elasticsearch?
> > >
> > >
> > > Thanks
> > >
> > > *Tim Washington* | *Senior Software Engineer*
> > >
> > > *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
> > >
> > > 747 Front St, 4th Fl | San Francisco, CA 94111
> > >
> > > *Our Mission: **T**o build a better financial world*
> > >
> >
>

Reply via email to