Hi Marcelo,
*task.inputs=product.product.txt* The format of this property task.inputs is system_name.stream_name, for example - kafka.PageViewEvent. You've defined your own system called *product* with a custom *FileFeedSystemFactory*. It seems like there was an exception in your FileFeedSystemFactory.getConsumer method. Run your example in debug mode or catch/log exceptions in your FileFeedSystemFactory implementation. You can read from kafka by defining task.inputs = kafka.topic_name Thanks, Jagadish On Mon, Dec 28, 2015 at 5:28 AM, Marcelo Romaniuc < mroma...@yahoo.com.invalid> wrote: > Hi, > > Trying a first app on Samza and used the hello-samza as model. When > I run the app, get the following: > > 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system factories: > Set(product, kafka) > 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got input stream metadata: > Map(SystemStream [system=product, stream=product.txt] -> > SystemStreamMetadata [streamName=product.txt, partitionMetadata={Partition > [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, > newestOffset=null, upcomingOffset=null]}]) > 2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a consumer for > product, so skipping. > 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system consumers: Set() > 2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a producer for > product, so skipping. > > What am I missing ? > > My config... > # Job > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.name=file-feed > > # YARN > > yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > # Task > task.class=com.test.samza.task.FileFeedStreamTask > task.inputs=product.product.txt > > # Serializers > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # FileFeed System - product > systems.product.samza.factory=com.test.samza.system.FileFeedSystemFactory > > > # Kafka System > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.msg.serde=json > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > systems.kafka.producer.bootstrap.servers=localhost:9092 > > # Job Coordinator > job.coordinator.system=kafka > # Add configuration to disable checkpointing for this job once it is > available in the Coordinator Stream model > # See > https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 > for more details > job.coordinator.replication.factor=1 > > > > Thanks and Regards, > marcelo > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University