Hey Tim,

Can you include the full log? Since the producer is not showing up, I'd
expect to see some earlier messages explaining why. In particular, this
section of code from SamzaContainer.scala should print errors instantiating
the producers:

    val producers = systemFactories
>       .map {
>         case (systemName, systemFactory) =>
>           try {
>             (systemName, systemFactory.getProducer(systemName, config,
> samzaContainerMetrics.registry))
>           } catch {
>             case e: Exception =>
>               info("Failed to create a producer for %s, so skipping."
> format systemName)
>               debug("Exception detail:", e)
>               (systemName, null)
>           }
>       }
>       .filter(_._2 != null)
>       .toMap
>     info("Got system producers: %s" format producers.keys)


And from the looks of it, it would be a good idea to enable debug logging
at least for org.apache.samza.container, so we can see the exception
detail.

Thanks,
Jake

On Tue, May 10, 2016 at 8:36 AM, Tim Washington <
tim.washing...@fundingcircle.com> wrote:

> *Tim Washington* | *Senior Software Engineer*
>
> *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
>
> 747 Front St, 4th Fl | San Francisco, CA 94111
>
> *Our Mission: **T**o build a better financial world*
>
>
>
>
> ---------- Forwarded message ----------
> From: Tim Washington <tim.washing...@fundingcircle.com>
> Date: 9 May 2016 at 15:40
> Subject: Howto Use The Elasticsearch Producer
> To: dev-subscr...@samza.apache.org
> Cc: Andy Chambers <andy.chamb...@fundingcircle.com>, Charles Reese <
> charles.re...@fundingcircle.com>
>
>
> Hey guys,
>
> I'm trying to use the Elasticsearch Producer, as described in this message
> <https://reviews.apache.org/r/36768/diff/1#3>. But I run into an
> exception,
> trying to write out the message.
>
> 15:15:59.586 [ThreadJob] INFO  o.a.samza.container.SamzaContainer -
> Entering run loop.
> 15:16:13.998 [ThreadJob] INFO  o.a.samza.container.TaskInstance -
> SystemStreamPartition [kafka, ledger, 0] is catched up.
> 15:16:14.006 [ThreadJob] INFO  ledger.elastic.job -
> #object[org.apache.samza.container.TaskInstance$$anon$1 0x20904646
> org.apache.samza.container.TaskInstance$$anon$1@20904646]
> 15:16:14.011 [ThreadJob] INFO  ledger.elastic.job - {:class
> org.apache.samza.system.IncomingMessageEnvelope, :key nil, :message
> :qwerty, :offset 5, :size 0, :system-stream-partition {:class
> org.apache.samza.system.SystemStreamPartition, :partition {:class
> org.apache.samza.Partition, :partition-id 0}, :stream ledger, :system
> kafka, :system-stream {:class org.apache.samza.system.SystemStream, :stream
> ledger, :system kafka}}}
> 15:16:14.016 [ThreadJob] ERROR o.a.samza.container.SamzaContainer - Caught
> exception in process loop.
> *org.apache.samza.SamzaException: Attempting to produce to unknown system:
> elasticsearch. Available systems: Set(kafka). Please add the system to your
> configuration, or update outgoing message envelope to send to a defined
> system.*
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> at
>
> org.apache.samza.system.SystemProducers$$anonfun$2.apply(SystemProducers.scala:86)
> ...
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
> at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
>
>
> But the source of
> <
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala#L86
> >
> the stack trace line, tells me that the code can't find a *producer* in the
> elasticsearch system. The code that I'm using is straight out of the
> example.
>
> collector.send(new OutgoingMessageEnvelope(new
> SystemStream("elasticsearch", ELASTICSEARCH_INDEX + "/" +
> ELASTICSEARCH_TYPE), parsedJsonObject));
>
>
> And if we print out the config, I'm indeed seeing *elasticsearch* as a
> system.
>
>
> 14:09:19.978 [main] INFO  ledger.run - ;;
> ;;  ___  __ _ _ __ ___  ______ _
> ;; / __|/ _` | '_ ` _ \|_  / _` |
> ;; \__ \ (_| | | | | | |/ / (_| |
> ;; |___/\__,_|_| |_| |_/___\__,_|
> ;;
>
> 14:09:19.980 [main] INFO  ledger.run - ;; >>>>>>> ledger-elastic-connector
> <<<<<<<
> 14:09:19.980 [main] INFO  ledger.run - ;;
> 14:09:19.981 [main] INFO  ledger.run - ;;
> job.coordinator.replication.factor=1
> 14:09:19.981 [main] INFO  ledger.run - ;; job.coordinator.system=kafka
> 14:09:19.981 [main] INFO  ledger.run - ;;
> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> 14:09:19.981 [main] INFO  ledger.run - ;; job.name
> =ledger-elastic-connector
> 14:09:19.981 [main] INFO  ledger.run - ;;
> samja.task.factory.function=ledger.elastic.job/elastic-connector
> 14:09:19.981 [main] INFO  ledger.run - ;;
> serializers.registry.edn.class=samja.core.EDNSerdeFactory
> 14:09:19.981 [main] INFO  ledger.run - ;;
> systems.elasticsearch.client.elasticsearch.cluster.name=elasticsearch
> 14:09:19.982 [main] INFO  ledger.run - ;;
>
> systems.elasticsearch.client.factory=org.apache.samza.system.elasticsearch.clientTransport.ClientFactory
> 14:09:19.982 [main] INFO  ledger.run - ;;
> systems.elasticsearch.client.transport.host=localhost
> 14:09:19.982 [main] INFO  ledger.run - ;;
> systems.elasticsearch.client.transport.port=9300
> 14:09:19.982 [main] INFO  ledger.run - ;;
>
> systems.elasticsearch.index.request.factory=org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory
> 14:09:19.982 [main] INFO  ledger.run - ;;
>
> systems.elasticsearch.samza.factory=org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory
> 14:09:19.982 [main] INFO  ledger.run - ;;
> systems.kafka.consumer.zookeeper.connect=192.168.99.100:2181
> 14:09:19.982 [main] INFO  ledger.run - ;;
> systems.kafka.producer.bootstrap.servers=192.168.99.100:9092
> 14:09:19.982 [main] INFO  ledger.run - ;;
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> 14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.key.serde=edn
> 14:09:19.982 [main] INFO  ledger.run - ;; systems.kafka.samza.msg.serde=edn
> 14:09:19.982 [main] INFO  ledger.run - ;;
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> 14:09:19.982 [main] INFO  ledger.run - ;;
> task.checkpoint.replication.factor=1
> 14:09:19.982 [main] INFO  ledger.run - ;; task.checkpoint.system=kafka
> 14:09:19.982 [main] INFO  ledger.run - ;; task.class=samja.task.SimpleTask
> 14:09:19.983 [main] INFO  ledger.run - ;;
>
> task.inputs=kafka.ledger,kafka.cash-movement-performed,kafka.ledger-entry-added
>
>
> Is there any other setup, or switches I need to make, to be able to
> successfully write a message out to elasticsearch?
>
>
> Thanks
>
> *Tim Washington* | *Senior Software Engineer*
>
> *tim.washing...@fundingcircle.com <renee.nich...@fundingcircle.com>*
>
> 747 Front St, 4th Fl | San Francisco, CA 94111
>
> *Our Mission: **T**o build a better financial world*
>

Reply via email to