Hi Bharath,

        Below is configuration:
# App
app.name=canal-metrics
app.id=test
app.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
task.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask

job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.default.system=kafka

job.container.thread.pool.size=0
task.max.concurrency=1

task.opts=-Xmx1843m

task.checkpoint.replication.factor=3

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Checkpointing
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory

# Kafka
systems.kafka.consumer.zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183
systems.kafka.producer.bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096

# Systems & Streams
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.default.stream.samza.key.serde=string
systems.kafka.default.stream.samza.msg.serde=string

systems.influxdb.samza.factory=com.antfact.datacenter.canal.system.InfluxDBSystemFactory
systems.hstore.default.stream.samza.key.serde=string
systems.hstore.default.stream.samza.msg.serde=string

task.inputs=kafka.samza-metrics

task.consumer.batch.size=100

# Deployment
yarn.package.path=hdfs://p002132.antfact.com/rflow-apps/data/canal-metrics-3.0-dist.tar.gz
yarn.container.count=1
cluster-manager.container.memory.mb=2048


        Below is entire log:
added manifest
java version "1.8.0_151"
Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
/usr/java/jdk1.8.0_151/bin/java 
-Dlog4j.configuration=file:bin/log4j-console.xml 
-DisThreadContextMapInheritable=true 
-Dsamza.log.dir=/home/ant/canal-test/canal-metrics/target/canal 
-Djava.io.tmpdir=/home/ant/canal-test/canal-metrics/target/canal/tmp -Xmx768M 
-XX:+PrintGCDateStamps 
-Xloggc:/home/ant/canal-test/canal-metrics/target/canal/gc.log 
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 
-d64 -cp /etc/hadoop/conf:pathing.jar 
org.apache.samza.runtime.ApplicationRunnerMain 
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
--config-path=/home/ant/canal-test/canal-metrics/target/canal/config/canal-metrics.properties
2019-05-30 09:59:10.900 [main] TaskFactoryUtil [INFO] Got task class name: 
com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
2019-05-30 09:59:10.914 [main] RemoteJobPlanner [INFO] The run id for this run 
is 1559181550904-ec4b9a69
2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.name is defined, 
generating job.name equal to app.name value: canal-metrics
2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.id is defined, generating 
job.id equal to app.name value: test
2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.name is defined, 
generating job.name equal to app.name value: canal-metrics
2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.id is defined, generating 
job.id equal to app.name value: test
2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] Auto offset reset 
value for KafkaConsumer for system upcoming converted from latest(samza) to {}
2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] setting 
auto.offset.reset for system kafka to latest
2019-05-30 09:59:10.962 [main] KafkaConsumerConfig [INFO] setting key 
serialization for the consumer(for system kafka) to ByteArrayDeserializer
2019-05-30 09:59:10.964 [main] KafkaConsumerConfig [INFO] setting value 
serialization for the consumer(for system kafka) to ByteArrayDeserializer
2019-05-30 09:59:10.970 [main] KafkaSystemConsumer [INFO] Instantiating 
KafkaConsumer for systemName kafka with properties 
{key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, 
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, 
enable.auto.commit=false, max.poll.records=100, 
zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183, 
group.id=canal-metrics-test, 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor, 
bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096,
 auto.offset.reset=latest, client.id=kafka_admin_consumer-canal_metrics-test}
2019-05-30 09:59:11.003 [main] ConsumerConfig [INFO] ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, 10.20.1.89:9096, 
10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096]
        check.crcs = true
        client.id = kafka_admin_consumer-canal_metrics-test
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = canal-metrics-test
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 100
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

2019-05-30 09:59:11.147 [main] ConsumerConfig [WARN] The configuration 
'zookeeper.connect' was supplied but isn't a known config.
2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka version : 1.1.0
2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka commitId : 
fdcf75ea326b8e07
2019-05-30 09:59:11.166 [main] KafkaSystemAdmin [INFO] New admin client with 
props:{bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096,
 zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183}
2019-05-30 09:59:11.170 [main] AdminClientConfig [INFO] AdminClientConfig 
values:
        bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, 10.20.1.89:9096, 
10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096]
        client.id =
        connections.max.idle.ms = 300000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 120000
        retries = 5
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

2019-05-30 09:59:11.191 [main] AdminClientConfig [WARN] The configuration 
'zookeeper.connect' was supplied but isn't a known config.
2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka version : 1.1.0
2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka commitId : 
fdcf75ea326b8e07
2019-05-30 09:59:11.208 [main] Log4jControllerRegistration$ [INFO] Registered 
kafka:type=kafka.Log4jController MBean
2019-05-30 09:59:11.223 [main] KafkaSystemAdmin [INFO] Created KafkaSystemAdmin 
for system kafka
Exception in thread "main" org.apache.samza.SamzaException: Failed to run 
application
        at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
        at 
org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
        at 
org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
Caused by: java.lang.NullPointerException
        at java.util.HashMap.merge(HashMap.java:1225)
        at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
        at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
        at 
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84)
        at org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
        at 
org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55)
        at 
org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64)
        at 
org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94)
        at 
org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57)
        at 
org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67)
        ... 2 more


        Thanks.

Qi Shu

> 在 2019年5月30日,上午3:41,Bharath Kumara Subramanian <codin.mart...@gmail.com> 写道:
> 
> Hi Qi,
> 
> Can you share your application configuration? Especially the systems your
> application consumes and produces to and its related configuration.
> Also, would it be possible for to attach the entire log?
> 
> Thanks,
> Bharath
> 
> On Tue, May 28, 2019 at 7:07 PM QiShu <sh...@eefung.com> wrote:
> 
>> Hi,
>> 
>>        Below is the running environment:
>> Hadoop version 3.1.0
>> java version “1.8.0_151"
>> samza-api-1.1.0.jar
>> samza-core_2.12-1.1.0.jar
>> samza-kafka_2.12-1.1.0.jar
>> samza-kv_2.12-1.1.0.jar
>> samza-kv-inmemory_2.12-1.1.0.jar
>> samza-kv-rocksdb_2.12-1.1.0.jar
>> samza-log4j_2.12-1.1.0.jar
>> samza-shell-1.1.0-dist.tgz
>> samza-yarn_2.12-1.1.0.jar
>> scala-compiler-2.12.1.jar
>> scala-library-2.12.1.jar
>> scala-logging_2.12-3.7.2.jar
>> scala-parser-combinators_2.12-1.0.4.jar
>> scala-reflect-2.12.4.jar
>> scalate-core_2.12-1.8.0.jar
>> scalate-util_2.12-1.8.0.jar
>> scalatra_2.12-2.5.0.jar
>> scalatra-common_2.12-2.5.0.jar
>> scalatra-scalate_2.12-2.5.0.jar
>> scala-xml_2.12-1.0.6.jar
>> kafka_2.12-1.1.0.jar
>> kafka-clients-1.1.0.jar
>> 
>>        Below is the exception when starting app in Yarn:
>> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka version : 1.1.0
>> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka commitId :
>> fdcf75ea326b8e07
>> 2019-05-29 09:52:47.862 [main] Log4jControllerRegistration$ [INFO]
>> Registered kafka:type=kafka.Log4jController MBean
>> 2019-05-29 09:52:47.877 [main] KafkaSystemAdmin [INFO] Created
>> KafkaSystemAdmin for system kafka
>> Exception in thread "main" org.apache.samza.SamzaException: Failed to run
>> application
>>        at
>> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
>>        at
>> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
>>        at
>> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
>> Caused by: java.lang.NullPointerException
>>        at java.util.HashMap.merge(HashMap.java:1225)
>>        at
>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>>        at
>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>        at
>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
>>        at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>        at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>        at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>        at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>        at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>        at
>> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84)
>>        at
>> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
>>        at
>> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55)
>>        at
>> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64)
>>        at
>> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94)
>>        at
>> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57)
>>        at
>> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67)
>>        ... 2 more
>> 
>> 
>>        Thanks for your help!
>> 
>> Qi Shu
>> 
>> 
>> 

Reply via email to