Hey sgg,
Ah, I didn't notice, but your code does not implement StreamTask, just
InitableTask.
These interfaces are like mix-ins. You MUST implement StreamTask. Try:
public class SimpleSamzaTask implements StreamTask, InitableTask {
...
}
Sorry about that.
Cheers,
Chris
On 12/9/13 5:45 PM, "sgg" <[email protected]> wrote:
>Ok thanks Chris. But I still need to figure out why the init() method is
>not getting called, in fact, the entire Samza job fails. It works if the
>task implements StreamTask (obviously not invoking init()), but at least
>the task runs. When I change it to implement InitableTask() as shown,
>the job fails. It seems to not be able to instantiate the task object, I
>was hoping there would be an example that runs that would allow me to see
>what a correct start up sequence looks like.
>
>sgg
>On Dec 9, 2013, at 12:27 PM, Chris Riccomini <[email protected]>
>wrote:
>
>> Hi there,
>>
>> Your task looks good. Your init() method receives two parameters:
>>Config,
>> and TaskContext. The config object has *all* config properties defined
>>in
>> your job's config. If you were to write:
>>
>> @Override
>> public void init(Config config, TaskContext context) throws Exception {
>> System.out.println(config.get("task.foo.bar");
>> }
>>
>>
>> And you had task.foo.bar=someVal, you'd then expect to see your task
>>print
>> "someVal" once for each partition that the Samza container is
>>responsible
>> for. For example, if you had a single Samza container
>> (yarn.container.count=1, or using LocalJobFactory), and you had defined
>>a
>> single input stream that had 4 partitions, your logs would show
>>"someVal"
>> printed four times (one for each partition that the Samza container is
>> responsible for processing).
>>
>> Cheers,
>> Chris
>>
>> On 12/7/13 4:18 AM, "sgg" <[email protected]> wrote:
>>
>>> Does anyone have a working example of a Samza job using an
>>>InitableTask?
>>>
>>> It wasn't clear from the documentation what ways to specify the input
>>> parameters to the init() method. Is it a matter of adding lines in the
>>> config file that look like:
>>> task.foo.bar = someVal
>>>
>>> ?
>>>
>>> Also, when I tried to run a simple example with an InitableTask, I get
>>>an
>>> error. It seems there is some problem when Samza attempts to
>>>instantiate
>>> the class
>>>
>>>
>>> Here is the sample task I am trying to run:
>>> public class SimpleSamzaTask implements InitableTask {
>>> private static final SystemStream OUTPUT_STREAM =
>>> new SystemStream("kafka", "simpleout");
>>>
>>> public SimpleSamzaTask(){
>>> super();
>>> }
>>>
>>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>>> collector, TaskCoordinator coordinator) {
>>> System.out.println("hello world");
>>> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: "
>>>+
>>> envelope.getMessage()));
>>> }
>>>
>>> @Override
>>> public void init(Config arg0, TaskContext arg1) throws Exception {
>>> // TODO Auto-generated method stub
>>> System.out.println("in init");
>>> }
>>> }
>>>
>>> Thoughts?
>>
>