Hi Bharath, Below is configuration: # App app.name=canal-metrics app.id=test app.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask task.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.default.system=kafka job.container.thread.pool.size=0 task.max.concurrency=1 task.opts=-Xmx1843m task.checkpoint.replication.factor=3 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory # Checkpointing task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory # Kafka systems.kafka.consumer.zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183 systems.kafka.producer.bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096 # Systems & Streams systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.default.stream.samza.key.serde=string systems.kafka.default.stream.samza.msg.serde=string systems.influxdb.samza.factory=com.antfact.datacenter.canal.system.InfluxDBSystemFactory systems.hstore.default.stream.samza.key.serde=string systems.hstore.default.stream.samza.msg.serde=string task.inputs=kafka.samza-metrics task.consumer.batch.size=100 # Deployment yarn.package.path=hdfs://p002132.antfact.com/rflow-apps/data/canal-metrics-3.0-dist.tar.gz yarn.container.count=1 cluster-manager.container.memory.mb=2048 Below is entire log: added manifest java version "1.8.0_151" Java(TM) SE Runtime Environment (build 1.8.0_151-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode) /usr/java/jdk1.8.0_151/bin/java -Dlog4j.configuration=file:bin/log4j-console.xml -DisThreadContextMapInheritable=true -Dsamza.log.dir=/home/ant/canal-test/canal-metrics/target/canal -Djava.io.tmpdir=/home/ant/canal-test/canal-metrics/target/canal/tmp -Xmx768M -XX:+PrintGCDateStamps -Xloggc:/home/ant/canal-test/canal-metrics/target/canal/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10241024 -d64 -cp /etc/hadoop/conf:pathing.jar org.apache.samza.runtime.ApplicationRunnerMain --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=/home/ant/canal-test/canal-metrics/target/canal/config/canal-metrics.properties 2019-05-30 09:59:10.900 [main] TaskFactoryUtil [INFO] Got task class name: com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask 2019-05-30 09:59:10.914 [main] RemoteJobPlanner [INFO] The run id for this run is 1559181550904-ec4b9a69 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.name is defined, generating job.name equal to app.name value: canal-metrics 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.id is defined, generating job.id equal to app.name value: test 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.name is defined, generating job.name equal to app.name value: canal-metrics 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.id is defined, generating job.id equal to app.name value: test 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] Auto offset reset value for KafkaConsumer for system upcoming converted from latest(samza) to {} 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] setting auto.offset.reset for system kafka to latest 2019-05-30 09:59:10.962 [main] KafkaConsumerConfig [INFO] setting key serialization for the consumer(for system kafka) to ByteArrayDeserializer 2019-05-30 09:59:10.964 [main] KafkaConsumerConfig [INFO] setting value serialization for the consumer(for system kafka) to ByteArrayDeserializer 2019-05-30 09:59:10.970 [main] KafkaSystemConsumer [INFO] Instantiating KafkaConsumer for systemName kafka with properties {key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, enable.auto.commit=false, max.poll.records=100, zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183, group.id=canal-metrics-test, partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor, bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096, auto.offset.reset=latest, client.id=kafka_admin_consumer-canal_metrics-test} 2019-05-30 09:59:11.003 [main] ConsumerConfig [INFO] ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096] check.crcs = true client.id = kafka_admin_consumer-canal_metrics-test connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = canal-metrics-test heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 100 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 2019-05-30 09:59:11.147 [main] ConsumerConfig [WARN] The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka version : 1.1.0 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka commitId : fdcf75ea326b8e07 2019-05-30 09:59:11.166 [main] KafkaSystemAdmin [INFO] New admin client with props:{bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096, zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183} 2019-05-30 09:59:11.170 [main] AdminClientConfig [INFO] AdminClientConfig values: bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096] client.id = connections.max.idle.ms = 300000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 120000 retries = 5 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS 2019-05-30 09:59:11.191 [main] AdminClientConfig [WARN] The configuration 'zookeeper.connect' was supplied but isn't a known config. 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka version : 1.1.0 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka commitId : fdcf75ea326b8e07 2019-05-30 09:59:11.208 [main] Log4jControllerRegistration$ [INFO] Registered kafka:type=kafka.Log4jController MBean 2019-05-30 09:59:11.223 [main] KafkaSystemAdmin [INFO] Created KafkaSystemAdmin for system kafka Exception in thread "main" org.apache.samza.SamzaException: Failed to run application at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79) at org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) at org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) Caused by: java.lang.NullPointerException at java.util.HashMap.merge(HashMap.java:1225) at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84) at org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38) at org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55) at org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64) at org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94) at org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57) at org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67) ... 2 more Thanks. Qi Shu > 在 2019年5月30日,上午3:41,Bharath Kumara Subramanian <codin.mart...@gmail.com> 写道: > > Hi Qi, > > Can you share your application configuration? Especially the systems your > application consumes and produces to and its related configuration. > Also, would it be possible for to attach the entire log? > > Thanks, > Bharath > > On Tue, May 28, 2019 at 7:07 PM QiShu <sh...@eefung.com> wrote: > >> Hi, >> >> Below is the running environment: >> Hadoop version 3.1.0 >> java version “1.8.0_151" >> samza-api-1.1.0.jar >> samza-core_2.12-1.1.0.jar >> samza-kafka_2.12-1.1.0.jar >> samza-kv_2.12-1.1.0.jar >> samza-kv-inmemory_2.12-1.1.0.jar >> samza-kv-rocksdb_2.12-1.1.0.jar >> samza-log4j_2.12-1.1.0.jar >> samza-shell-1.1.0-dist.tgz >> samza-yarn_2.12-1.1.0.jar >> scala-compiler-2.12.1.jar >> scala-library-2.12.1.jar >> scala-logging_2.12-3.7.2.jar >> scala-parser-combinators_2.12-1.0.4.jar >> scala-reflect-2.12.4.jar >> scalate-core_2.12-1.8.0.jar >> scalate-util_2.12-1.8.0.jar >> scalatra_2.12-2.5.0.jar >> scalatra-common_2.12-2.5.0.jar >> scalatra-scalate_2.12-2.5.0.jar >> scala-xml_2.12-1.0.6.jar >> kafka_2.12-1.1.0.jar >> kafka-clients-1.1.0.jar >> >> Below is the exception when starting app in Yarn: >> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka version : 1.1.0 >> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka commitId : >> fdcf75ea326b8e07 >> 2019-05-29 09:52:47.862 [main] Log4jControllerRegistration$ [INFO] >> Registered kafka:type=kafka.Log4jController MBean >> 2019-05-29 09:52:47.877 [main] KafkaSystemAdmin [INFO] Created >> KafkaSystemAdmin for system kafka >> Exception in thread "main" org.apache.samza.SamzaException: Failed to run >> application >> at >> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79) >> at >> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) >> at >> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) >> Caused by: java.lang.NullPointerException >> at java.util.HashMap.merge(HashMap.java:1225) >> at >> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) >> at >> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >> at >> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696) >> at >> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >> at >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> at >> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >> at >> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84) >> at >> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38) >> at >> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55) >> at >> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64) >> at >> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94) >> at >> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57) >> at >> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67) >> ... 2 more >> >> >> Thanks for your help! >> >> Qi Shu >> >> >>