Hello,
I am trying to run my first job (publish what receives) in Samza and I think
that all the dependencies where added by configuring the Maven repositories
(solved in a recent question to the list). I am getting another exception on
the Job runner:
#/opt/jobs# bin/run-job.sh
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file://$PWD/job1.properties
java version "1.7.0_75"
OpenJDK Runtime Environment (IcedTea 2.5.4) (7u75-2.5.4-2)
OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
/usr/lib/jvm/java-7-openjdk-amd64/bin/java
-Dlog4j.configuration=file:bin/log4j-console.xml -Dsamza.log.dir=/opt/jobs
-Djava.io.tmpdir=/opt/jobs/tmp -Xmx768M -XX:+PrintGCDateStamps
-Xloggc:/opt/jobs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=10241024 -d64 -cp
/opt/hadoop/conf:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT.jar:/opt/jobs/lib/samzafroga-0.0.1-SNAPSHOT-jar-with-dependencies.jar
org.apache.samza.job.JobRunner
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file:///opt/jobs/job1.properties
log4j: reset attribute= "false".
log4j: Threshold ="null".
log4j: Level value for root is [INFO].
log4j: root level set to INFO
log4j: Class name: [org.apache.log4j.ConsoleAppender]
log4j: Parsing layout of class: "org.apache.log4j.PatternLayout"
log4j: Setting property [conversionPattern] to [%d{dd MMM yyyy HH:mm:ss} %5p
%c{1} - %m%n].
log4j: Adding appender named [consoleAppender] to category [root].
log4j: Class name: [org.apache.log4j.RollingFileAppender]
log4j: Setting property [append] to [false].
log4j: Setting property [file] to [out/learning.log].
log4j: Parsing layout of class: "org.apache.log4j.PatternLayout"
log4j: Setting property [conversionPattern] to [%d{ABSOLUTE} %-5p [%c{1}] %m%n].
log4j: setFile called: out/learning.log, false
log4j: setFile ended
log4j: Adding appender named [fileAppender] to category [root].
Exception in thread "main" org.apache.samza.SamzaException: no job factory
class defined
at org.apache.samza.job.JobRunner.run(JobRunner.scala:53)
at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
at org.apache.samza.job.JobRunner.main(JobRunner.scala)
My properties file is this:
task.class=samzafroga.job1
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=acio-broker01:2181,acio-broker02:2181
task.inputs=kafka.frogain
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
systems.kafka.streams.frogain.samza.msg.serde=json
This is the code job code:
package samzafroga;
import org.apache.samza.config.Config;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
public class job1 implements StreamTask {
private final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "beste");
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator)
{
String msg = (String)envelope.getMessage();
String outmsg = msg;
collector.send(new
OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg));
}
}
I have been trying to read the code in the file JobRunner.scala, that
apparently is the one generation the exception and as I understand is having a
problem . I am not really sure if the problem is with the task.class definition
or I still have something missing in the system.
Thanks in advance,
Jordi
________________________________
Jordi Blasi Uribarri
Área I+D+i
[email protected]
Oficina Bilbao
[http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]