Hi Chris: DOH! that was the problem. Made the change to implement both interfaces and things ran fine! Thanks for the pointer!
BTW, where does the output from the System.out.println statements appear? i.e. where is stdout being piped to? I had expected to see it in the logs facility in the Yarn web console, but these print statements don't appear in the yarn stdout log. Where should I be looking? sgg On Dec 9, 2013, at 10:14 PM, Chris Riccomini <[email protected]> wrote: > Hey sgg, > > Ah, I didn't notice, but your code does not implement StreamTask, just > InitableTask. > > These interfaces are like mix-ins. You MUST implement StreamTask. Try: > > public class SimpleSamzaTask implements StreamTask, InitableTask { > > ... > } > > Sorry about that. > > Cheers, > Chris > > On 12/9/13 5:45 PM, "sgg" <[email protected]> wrote: > >> Ok thanks Chris. But I still need to figure out why the init() method is >> not getting called, in fact, the entire Samza job fails. It works if the >> task implements StreamTask (obviously not invoking init()), but at least >> the task runs. When I change it to implement InitableTask() as shown, >> the job fails. It seems to not be able to instantiate the task object, I >> was hoping there would be an example that runs that would allow me to see >> what a correct start up sequence looks like. >> >> sgg >> On Dec 9, 2013, at 12:27 PM, Chris Riccomini <[email protected]> >> wrote: >> >>> Hi there, >>> >>> Your task looks good. Your init() method receives two parameters: >>> Config, >>> and TaskContext. The config object has *all* config properties defined >>> in >>> your job's config. If you were to write: >>> >>> @Override >>> public void init(Config config, TaskContext context) throws Exception { >>> System.out.println(config.get("task.foo.bar"); >>> } >>> >>> >>> And you had task.foo.bar=someVal, you'd then expect to see your task >>> print >>> "someVal" once for each partition that the Samza container is >>> responsible >>> for. For example, if you had a single Samza container >>> (yarn.container.count=1, or using LocalJobFactory), and you had defined >>> a >>> single input stream that had 4 partitions, your logs would show >>> "someVal" >>> printed four times (one for each partition that the Samza container is >>> responsible for processing). >>> >>> Cheers, >>> Chris >>> >>> On 12/7/13 4:18 AM, "sgg" <[email protected]> wrote: >>> >>>> Does anyone have a working example of a Samza job using an >>>> InitableTask? >>>> >>>> It wasn't clear from the documentation what ways to specify the input >>>> parameters to the init() method. Is it a matter of adding lines in the >>>> config file that look like: >>>> task.foo.bar = someVal >>>> >>>> ? >>>> >>>> Also, when I tried to run a simple example with an InitableTask, I get >>>> an >>>> error. It seems there is some problem when Samza attempts to >>>> instantiate >>>> the class >>>> >>>> >>>> Here is the sample task I am trying to run: >>>> public class SimpleSamzaTask implements InitableTask { >>>> private static final SystemStream OUTPUT_STREAM = >>>> new SystemStream("kafka", "simpleout"); >>>> >>>> public SimpleSamzaTask(){ >>>> super(); >>>> } >>>> >>>> public void process(IncomingMessageEnvelope envelope, MessageCollector >>>> collector, TaskCoordinator coordinator) { >>>> System.out.println("hello world"); >>>> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: " >>>> + >>>> envelope.getMessage())); >>>> } >>>> >>>> @Override >>>> public void init(Config arg0, TaskContext arg1) throws Exception { >>>> // TODO Auto-generated method stub >>>> System.out.println("in init"); >>>> } >>>> } >>>> >>>> Thoughts? >>> >> >
