Hi Chris, This is what my config file looks like:
# Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=order-feed # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task task.class=samza.examples.order.task.OrderFeedStreamTask task.inputs=order.order # Serializers serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory # Wikipedia System systems.order.samza.factory=samza.examples.order.system.OrderSystemFactory The complete file is in here https://github.com/renato2099/hello-samza/blob/master/samza-job-package/src/main/config/order-feed.properties Thanks in advance for the help. Renato M. 2014-11-05 17:06 GMT+01:00 Chris Riccomini <criccom...@linkedin.com.invalid >: > > Hey Renato, > > Could you send the config for your job? > > The stack trace is indicating that you are using a system "task.inputs" > that isn't defined in the configuration. For example, if you have: > > task.inputs=kafka.my-topic > > But, you haven't defined a "kafka" system (systems.kafka.*) then you will > see this exception. > > Cheers, > Chris > > On 11/5/14 6:14 AM, "Renato Marroquín Mogrovejo" > <renatoj.marroq...@gmail.com> wrote: > > >Hi Yang, > > > >I tried setting the task.input to have the same key as the system's name, > >but I keep on getting error while trying to run it: > > > >2014-11-05 09:42:07 OffsetManager [INFO] Successfully loaded starting > >offsets: Map(SystemStreamPartition [partition=Partition [partition=0], > >system=order, stream=order] -> null) > >2014-11-05 09:42:07 SamzaContainer [INFO] Starting task instance stores. > >2014-11-05 09:42:07 SamzaContainer [INFO] Initializing stream tasks. > >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances > >with producers. > >2014-11-05 09:42:07 SamzaContainer [INFO] Starting producer multiplexer. > >2014-11-05 09:42:07 SamzaContainer [INFO] Registering task instances > >with consumers. > >2014-11-05 09:42:07 SamzaContainer [ERROR] Caught exception in process > >loop. > >org.apache.samza.system.SystemConsumersException: can't register > >order's consumer. > > at > >org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:164 > >) > > at > >org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.apply > >(TaskInstance.scala:128) > > at > >org.apache.samza.container.TaskInstance$anonfun$registerConsumers$2.apply > >(TaskInstance.scala:124) > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > > at > >org.apache.samza.container.TaskInstance.registerConsumers(TaskInstance.sca > >la:124) > > at > >org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.apply( > >SamzaContainer.scala:577) > > at > >org.apache.samza.container.SamzaContainer$anonfun$startConsumers$2.apply( > >SamzaContainer.scala:577) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > >scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > > at > >org.apache.samza.container.SamzaContainer.startConsumers(SamzaContainer.sc > >ala:577) > > at > >org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:501) > > at > >org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81) > > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >Caused by: java.util.NoSuchElementException: key not found: order > > at scala.collection.MapLike$class.default(MapLike.scala:228) > > at scala.collection.AbstractMap.default(Map.scala:58) > > at scala.collection.MapLike$class.apply(MapLike.scala:141) > > at scala.collection.AbstractMap.apply(Map.scala:58) > > at > >org.apache.samza.system.SystemConsumers.register(SystemConsumers.scala:165 > >) > > ... 13 more > > > > > > > >Maybe it has to do with me creating the partitions on the start method ? I > >guess the register method gets called first, then the start and finally > >the > >stop? > >Any suggestions? Thank you very much for your help! > > > > > >Renato M. > > > >2014-11-03 18:57 GMT+01:00 Yan Fang <yanfang...@gmail.com>: > > > >> Hi Renato, > >> > >> 1) In the task.inputs option you told me that I should use something > >>like > >> "ordersystem.order", I guess I have to use ordersystem because of the > >> factory name "OrderSystemFactory"? I tried using "order.order" but I > >>got an > >> <o.a.s.s.SystemConsumersException: can't register order's consumer> So I > >> imagine there is a naming convention for classes, inputs, and factories? > >> > >> 1) There is no restriction in the system name. I just used that as an > >> example. As long as the system name in task.inputs is the same as the > >> system name in systems.%systemname.samza.factory. For example, in the > >> following properties, the two bold names should be the same. > >> task.inputs=*foosystem*.tableName, systems.*foosystem* > >> .samza.factory=samza.examples.wikipedia.system.OrderSystemFactory > >> > >> In your case, the task.inputs=*order*.order , because systems.*order* > >> .samza.factory=samza.examples.order.system.OrderSystemFactory > >> > >> 2) I tried using the "ordersystem.order" name but I kept on getting the > >> NoPartitions exception > >> > >> 2) The code in your previous register method should be ok. > >> > >> 3) The BlockingEnvelopeMap has a put method to put the incoming messages > >> into the partition, and I am putting the table values on the start > >>method, > >> is this a bad practice? > >> > >> 3) It depends. Since the start method is only called once, the system > >> will only read the table once and put all the records into the > >> BlockingEnvelop. For the testing purpose, I think it is fine. You can > >>have > >> a look at our filesystem > >> < > >> > >> https://github.com/apache/incubator-samza/blob/master/samza-core/src/main > >>/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala > >> >, > >> which uses a thread to monitor the file. > >> > >> 4) WikipediaFeedStreamTask takes an envelope object and puts that to > >>Kafka, > >> but where does it get the envelope from? the consumer right? > >> > >> 4) Yes, all the messages are from the put method in the consumer. > >> > >> Thanks, > >> > >> Fang, Yan > >> yanfang...@gmail.com > >> +1 (206) 849-4108 > >> > >> On Mon, Nov 3, 2014 at 1:38 AM, Renato Marroquín Mogrovejo < > >> renatoj.marroq...@gmail.com> wrote: > >> > >> > Thanks for the help Yang! > >> > I think I understand more things now, but I also have more questions > >>:) > >> > > >> > 1) In the task.inputs option you told me that I should use something > >>like > >> > "ordersystem.order", I guess I have to use ordersystem because of the > >> > factory name "OrderSystemFactory"? I tried using "order.order" but I > >>got > >> an > >> > <o.a.s.s.SystemConsumersException: can't register order's consumer> > >>So I > >> > imagine there is a naming convention for classes, inputs, and > >>factories? > >> > 2) I tried using the "ordersystem.order" name but I kept on getting > >>the > >> > NoPartitions exception > >> > 3) The BlockingEnvelopeMap has a put method to put the incoming > >>messages > >> > into the partition, and I am putting the table values on the start > >> method, > >> > is this a bad practice? > >> > < > >> > > >> > > >> > >> https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src > >>/main/java/samza/examples/order/system/OrderConsumer.java > >> > > > >> > 4) WikipediaFeedStreamTask takes an envelope object and puts that to > >> Kafka, > >> > but where does it get the envelope from? the consumer right? > >> > > >> > Thanks again Yang! > >> > > >> > > >> > Renato M. > >> > > >> > 2014-11-03 3:18 GMT+01:00 Yan Fang <yanfang...@gmail.com>: > >> > > >> > > Hi Renato, > >> > > > >> > > on the hello-samza example for each new incoming message which > >>belongs > >> > > to a channel, a new partition is created, right? > >> > > > >> > > -- In WikipediaFeed job of hello-samza, each channel actually is > >> treated > >> > as > >> > > one stream of the Wikipedia System. Each stream has on partition 0. > >> This > >> > is > >> > > the code: > >> > > SystemStreamPartition systemStreamPartition = new > >> > > SystemStreamPartition(systemName, > >> > > event.getChannel(), new Partition(0); > >> > > > >> > > So in your code, I think each table could be thought as one stream, > >> with > >> > > only partition 0. So it will be like > >>task.input=ordersystem.tableName > >> > > > >> > > -- Then you maybe confused by what happened in Kafka. All the > >>messages > >> > sent > >> > > to wikipedia-raw topic is done in WikipediaFeedStreamTask > >> > > < > >> > > > >> > > >> > >> https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src > >>/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java > >> > > > > >> > > . > >> > > > >> > > Exception in thread "main" org.apache.samza.SamzaException: No > >> partitions > >> > > for this task. Can't run a task without partition assignments. It's > >> > likely > >> > > that the partition manager for this system doesn't know about the > >> stream > >> > > you're trying to read. > >> > > at > >> > > >>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77) > >> > > at > >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> > > > >> > > -- Is the stream registered to the system? > >> > > SystemStreamPartition systemStreamPartition = new > >> > > SystemStreamPartition(systemName, > >> > > "order", new Partition(0); > >> > > Is the "order" registered ? such as task.input=ordersystem.order ? > >> > > > >> > > Let me know if you have other questions. > >> > > > >> > > Thanks, > >> > > > >> > > Fang, Yan > >> > > yanfang...@gmail.com > >> > > +1 (206) 849-4108 > >> > > > >> > > On Sun, Nov 2, 2014 at 4:56 AM, Renato Marroquín Mogrovejo < > >> > > renatoj.marroq...@gmail.com> wrote: > >> > > > >> > > > Hello Samza team, > >> > > > > >> > > > > >> > > > I am trying to modify the hello-samza example application to > >>replay > >> > > events > >> > > > which are in a table. But I am having some troubles. > >> > > > So on the hello-samza example for each new incoming message which > >> > belongs > >> > > > to a channel, a new partition is created, right? > >> > > > Now in my case, how (where) do I create these partitions? I create > >> them > >> > > in > >> > > > [1] but I am almost sure that is wrong because I keep getting the > >> > > exception > >> > > > saying that there are no partitions for this task. I mean ideally > >>I > >> > would > >> > > > like to create partitions based on the keys I am reading from the > >> > table. > >> > > > Could anybody help me on this task please? Many thanks in advance! > >> > > > > >> > > > > >> > > > Renato M. > >> > > > > >> > > > > >> > > > Exception in thread "main" org.apache.samza.SamzaException: No > >> > partitions > >> > > > for this task. Can't run a task without partition assignments. > >>It's > >> > > likely > >> > > > that the partition manager for this system doesn't know about the > >> > stream > >> > > > you're trying to read. > >> > > > at > >> > > > >> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:77) > >> > > > at > >> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> > > > > >> > > > > >> > > > [1] > >> > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/renato2099/hello-samza/blob/master/samza-wikipedia/src > >>/main/java/samza/examples/order/system/OrderConsumer.java#L47 > >> > > > > >> > > > >> > > >> >