Hi there,
Your task looks good. Your init() method receives two parameters: Config,
and TaskContext. The config object has *all* config properties defined in
your job's config. If you were to write:
@Override
public void init(Config config, TaskContext context) throws Exception {
System.out.println(config.get("task.foo.bar");
}
And you had task.foo.bar=someVal, you'd then expect to see your task print
"someVal" once for each partition that the Samza container is responsible
for. For example, if you had a single Samza container
(yarn.container.count=1, or using LocalJobFactory), and you had defined a
single input stream that had 4 partitions, your logs would show "someVal"
printed four times (one for each partition that the Samza container is
responsible for processing).
Cheers,
Chris
On 12/7/13 4:18 AM, "sgg" <[email protected]> wrote:
>Does anyone have a working example of a Samza job using an InitableTask?
>
>It wasn't clear from the documentation what ways to specify the input
>parameters to the init() method. Is it a matter of adding lines in the
>config file that look like:
>task.foo.bar = someVal
>
>?
>
>Also, when I tried to run a simple example with an InitableTask, I get an
>error. It seems there is some problem when Samza attempts to instantiate
>the class
>
>
>Here is the sample task I am trying to run:
>public class SimpleSamzaTask implements InitableTask {
> private static final SystemStream OUTPUT_STREAM =
> new SystemStream("kafka", "simpleout");
>
> public SimpleSamzaTask(){
> super();
> }
>
> public void process(IncomingMessageEnvelope envelope, MessageCollector
>collector, TaskCoordinator coordinator) {
> System.out.println("hello world");
> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: " +
>envelope.getMessage()));
> }
>
> @Override
> public void init(Config arg0, TaskContext arg1) throws Exception {
> // TODO Auto-generated method stub
> System.out.println("in init");
> }
>}
>
>Thoughts?