Hi Marcelo,

*task.inputs=product.product.txt*
The format of this property task.inputs is system_name.stream_name, for
example - kafka.PageViewEvent.

You've defined your own system called *product* with a custom
*FileFeedSystemFactory*. It seems like there was an exception in your
FileFeedSystemFactory.getConsumer method. Run your example in debug mode or
catch/log exceptions in your FileFeedSystemFactory implementation.

You can read from kafka by defining task.inputs = kafka.topic_name

Thanks,
Jagadish


On Mon, Dec 28, 2015 at 5:28 AM, Marcelo Romaniuc <
mroma...@yahoo.com.invalid> wrote:

> Hi,
>
>     Trying a first app on Samza and used the hello-samza as model.    When
> I run the app, get the following:
>
> 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system factories:
> Set(product, kafka)
> 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got input stream metadata:
> Map(SystemStream [system=product, stream=product.txt] ->
> SystemStreamMetadata [streamName=product.txt, partitionMetadata={Partition
> [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null,
> newestOffset=null, upcomingOffset=null]}])
> 2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a consumer for
> product, so skipping.
> 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system consumers: Set()
> 2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a producer for
> product, so skipping.
>
>     What am I missing ?
>
>    My config...
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=file-feed
>
> # YARN
>
> yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
>
> # Task
> task.class=com.test.samza.task.FileFeedStreamTask
> task.inputs=product.product.txt
>
> # Serializers
>
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # FileFeed System - product
> systems.product.samza.factory=com.test.samza.system.FileFeedSystemFactory
>
>
> # Kafka System
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=json
> systems.kafka.consumer.zookeeper.connect=localhost:2181/
> systems.kafka.producer.bootstrap.servers=localhost:9092
>
> # Job Coordinator
> job.coordinator.system=kafka
> # Add configuration to disable checkpointing for this job once it is
> available in the Coordinator Stream model
> # See
> https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346
> for more details
> job.coordinator.replication.factor=1
>
>
>
> Thanks and Regards,
> marcelo
>
>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to