OK, so the logs are much more detailed and useful, but it's not making any
more sense:

2015-03-22 20:13:11 ClientUtils$ [INFO] Fetching metadata from broker
id:0,host:redacted:9092 with correlation id 0 for 1 topic(s) Set(myTopic)
2015-03-22 20:13:11 SyncProducer [ERROR] Producer connection to
redacted:9092 unsuccessful
java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:127)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        at
org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:40)
        at
org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:197)
        at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
        at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
        at
org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
        at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:138)
        at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:137)
        at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
        at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:136)
        at
org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:125)
        at
org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
        at
org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:58)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at
org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:58)
        at
org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:108)
        at
org.apache.samza.util.Util$.assignContainerToSSPTaskNames(Util.scala:127)
        at
org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:77)
        at
org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
        at
org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
2015-03-22 20:13:11 ClientUtils$ [WARN] Fetching topic metadata with
correlation id 0 for topics [Set(redacted)] from broker
[id:0,host:redacted,port:9092] failed

So, it's not connecting ... which has me confused because I'm pushing data
(from that python script) using the machine I'm running this test on.  I'll
do some more digging and report back the results.

On Sun, Mar 22, 2015 at 11:55 AM, Ash W Matheson <ash.mathe...@gmail.com>
wrote:

> Ignore that last email - was reading the page stupidly.
>
> On Sun, Mar 22, 2015 at 11:52 AM, Ash W Matheson <ash.mathe...@gmail.com>
> wrote:
>
>> Is there any easy way to do that without recompiling samza?  I'm trying
>> to localize that into the 'hello-samza' and looking at
>> http://samza.apache.org/learn/documentation/latest/jobs/logging.html
>> leads me to believe that I have to do this in the base samza project (not
>> hello-samza).
>>
>> On Sun, Mar 22, 2015 at 11:37 AM, Ash W Matheson <ash.mathe...@gmail.com>
>> wrote:
>>
>>> Sure - I'll do that in a bit and send it up to pastebin.
>>>
>>> On Sun, Mar 22, 2015 at 11:35 AM, Chinmay Soman <
>>> chinmay.cere...@gmail.com> wrote:
>>>
>>>> Can you please enable debug level logging and paste the log?
>>>>
>>>> On Sun, Mar 22, 2015 at 11:28 AM, Ash W Matheson <
>>>> ash.mathe...@gmail.com>
>>>> wrote:
>>>>
>>>> > No, it's behind some corporate stuff - I just redacted it so I could
>>>> share
>>>> > it up.
>>>> >
>>>> > On Sun, Mar 22, 2015 at 11:17 AM, Chinmay Soman <
>>>> chinmay.cere...@gmail.com
>>>> > >
>>>> > wrote:
>>>> >
>>>> > > Just for sanity check, is the broker host 'redacted:9092' or '
>>>> > > redactedec:9092'.
>>>> > >
>>>> > > Just wanted to rule out any typos. Are the 2 above hosts the same ?
>>>> > >
>>>> > > On Sun, Mar 22, 2015 at 11:08 AM, Ash W Matheson <
>>>> ash.mathe...@gmail.com
>>>> > >
>>>> > > wrote:
>>>> > >
>>>> > > > Also, here's the producer: http://pastebin.com/qMNJabTZ
>>>> > > >
>>>> > > >
>>>> > > > On Sun, Mar 22, 2015 at 10:57 AM, Ash W Matheson <
>>>> > ash.mathe...@gmail.com
>>>> > > >
>>>> > > > wrote:
>>>> > > >
>>>> > > > > Yep, first thing I checked (got bitten by that earlier in the
>>>> week
>>>> > with
>>>> > > > no
>>>> > > > > data actually in the topic).
>>>> > > > >
>>>> > > > > On Sun, Mar 22, 2015 at 10:56 AM, Chinmay Soman <
>>>> > > > chinmay.cere...@gmail.com
>>>> > > > > > wrote:
>>>> > > > >
>>>> > > > >> Can you double check that you can read data from your Kafka
>>>> broker ?
>>>> > > > >>
>>>> > > > >> > ./deploy/kafka/bin/kafka-topics.sh --describe --zookeeper
>>>> > > > localhost:2181
>>>> > > > >> --topic myTopic
>>>> > > > >> > ./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper
>>>> > > > localhost:2181
>>>> > > > >> --topic myTopic --from-beginning
>>>> > > > >>
>>>> > > > >> I've seen cases where if the Kafka broker isn't shutdown
>>>> properly,
>>>> > > > >> something like this happens.
>>>> > > > >>
>>>> > > > >> On Sun, Mar 22, 2015 at 10:35 AM, Ash W Matheson <
>>>> > > > ash.mathe...@gmail.com>
>>>> > > > >> wrote:
>>>> > > > >>
>>>> > > > >> > Hey all,
>>>> > > > >> >
>>>> > > > >> > Evaluating Samza currently and am running into some odd
>>>> issues.
>>>> > > > >> >
>>>> > > > >> > I'm currently working off the 'hello-samza' repo and trying
>>>> to
>>>> > > parse a
>>>> > > > >> > simple kafka topic that I've produced through an extenal
>>>> java app
>>>> > > > >> (nothing
>>>> > > > >> > other than a series of sentences) and it's failing pretty
>>>> hard for
>>>> > > me.
>>>> > > > >> The
>>>> > > > >> > base 'hello-samza' set of apps works fine, but as soon as I
>>>> change
>>>> > > the
>>>> > > > >> > configuration to look at a different Kafka/zookeeper I get
>>>> the
>>>> > > > >> following in
>>>> > > > >> > the userlogs:
>>>> > > > >> >
>>>> > > > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch
>>>> last
>>>> > > > offsets
>>>> > > > >> > for streams [myTopic] due to kafka.common.KafkaException:
>>>> fetching
>>>> > > > topic
>>>> > > > >> > metadata for topics [Set(myTopic)] from broker
>>>> > > > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying.
>>>> > > > >> >
>>>> > > > >> >
>>>> > > > >> > The modifications are pretty straightforward.  In the
>>>> > > > >> > Wikipedia-parser.properties, I've changed the following:
>>>> > > > >> > task.inputs=kafka.myTopic
>>>> > > > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
>>>> > > > >> > systems.kafka.consumer.auto.offset.reset=smallest
>>>> > > > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
>>>> > > > >> >
>>>> > > > >> > and in the actual java file WikipediaParserStreamTask.java
>>>> > > > >> >   public void process(IncomingMessageEnvelope envelope,
>>>> > > > MessageCollector
>>>> > > > >> > collector, TaskCoordinator coordinator) {
>>>> > > > >> >     Map<String, Object> jsonObject = (Map<String, Object>)
>>>> > > > >> > envelope.getMessage();
>>>> > > > >> >     WikipediaFeedEvent event = new
>>>> WikipediaFeedEvent(jsonObject);
>>>> > > > >> >
>>>> > > > >> >     try {
>>>> > > > >> >         System.out.println(event.getRawEvent());
>>>> > > > >> >
>>>> > > > >> > And then following the compile/extract/run process outlined
>>>> in the
>>>> > > > >> > hello-samza website.
>>>> > > > >> >
>>>> > > > >> > Any thoughts?  I've looked online for any 'super simple'
>>>> examples
>>>> > of
>>>> > > > >> > ingesting kafka in samza with very little success.
>>>> > > > >> >
>>>> > > > >>
>>>> > > > >>
>>>> > > > >>
>>>> > > > >> --
>>>> > > > >> Thanks and regards
>>>> > > > >>
>>>> > > > >> Chinmay Soman
>>>> > > > >>
>>>> > > > >
>>>> > > > >
>>>> > > >
>>>> > >
>>>> > >
>>>> > >
>>>> > > --
>>>> > > Thanks and regards
>>>> > >
>>>> > > Chinmay Soman
>>>> > >
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks and regards
>>>>
>>>> Chinmay Soman
>>>>
>>>
>>>
>>
>

Reply via email to