Here is my theory based on the stack trace and logs. InfluxDBSystemFactory could not create the admin successfully and returned a null. Inside JavaSystemConfig, when it tries to get all the admins for the systems available, Collectors.toMap(..) throws an exception during reducing due to a null value.
Few questions, 1. Can getAdmin(...) return a null? 2. Can you add some logs to your getAdmin API to see if they are printed right before the exception? I am guessing here since I don't have access to the source code of *InfluxDBSystemFactory*. If you can share the source code of InfluxDBSystemFactory, it will help me confirm my theory. Let me know how it goes. Thanks, Bharath On Wed, May 29, 2019 at 7:01 PM QiShu <sh...@eefung.com> wrote: > Hi Bharath, > > Below is configuration: > # App > app.name=canal-metrics > app.id=test > app.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask > task.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.default.system=kafka > > job.container.thread.pool.size=0 > task.max.concurrency=1 > > task.opts=-Xmx1843m > > task.checkpoint.replication.factor=3 > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # Checkpointing > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > # Kafka > systems.kafka.consumer.zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183 > systems.kafka.producer.bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096, > 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096 > > # Systems & Streams > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.default.stream.samza.key.serde=string > systems.kafka.default.stream.samza.msg.serde=string > > > systems.influxdb.samza.factory=com.antfact.datacenter.canal.system.InfluxDBSystemFactory > systems.hstore.default.stream.samza.key.serde=string > systems.hstore.default.stream.samza.msg.serde=string > > task.inputs=kafka.samza-metrics > > task.consumer.batch.size=100 > > # Deployment > yarn.package.path=hdfs:// > p002132.antfact.com/rflow-apps/data/canal-metrics-3.0-dist.tar.gz > yarn.container.count=1 > cluster-manager.container.memory.mb=2048 > > > Below is entire log: > added manifest > java version "1.8.0_151" > Java(TM) SE Runtime Environment (build 1.8.0_151-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode) > /usr/java/jdk1.8.0_151/bin/java > -Dlog4j.configuration=file:bin/log4j-console.xml > -DisThreadContextMapInheritable=true > -Dsamza.log.dir=/home/ant/canal-test/canal-metrics/target/canal > -Djava.io.tmpdir=/home/ant/canal-test/canal-metrics/target/canal/tmp > -Xmx768M -XX:+PrintGCDateStamps > -Xloggc:/home/ant/canal-test/canal-metrics/target/canal/gc.log > -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > -XX:GCLogFileSize=10241024 -d64 -cp /etc/hadoop/conf:pathing.jar > org.apache.samza.runtime.ApplicationRunnerMain > --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory > --config-path=/home/ant/canal-test/canal-metrics/target/canal/config/canal-metrics.properties > 2019-05-30 09:59:10.900 [main] TaskFactoryUtil [INFO] Got task class name: > com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask > 2019-05-30 09:59:10.914 [main] RemoteJobPlanner [INFO] The run id for this > run is 1559181550904-ec4b9a69 > 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.name is defined, > generating job.name equal to app.name value: canal-metrics > 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.id is defined, > generating job.id equal to app.name value: test > 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.name is defined, > generating job.name equal to app.name value: canal-metrics > 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.id is defined, > generating job.id equal to app.name value: test > 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] Auto offset > reset value for KafkaConsumer for system upcoming converted from > latest(samza) to {} > 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] setting > auto.offset.reset for system kafka to latest > 2019-05-30 09:59:10.962 [main] KafkaConsumerConfig [INFO] setting key > serialization for the consumer(for system kafka) to ByteArrayDeserializer > 2019-05-30 09:59:10.964 [main] KafkaConsumerConfig [INFO] setting value > serialization for the consumer(for system kafka) to ByteArrayDeserializer > 2019-05-30 09:59:10.970 [main] KafkaSystemConsumer [INFO] Instantiating > KafkaConsumer for systemName kafka with properties > {key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, > value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, > enable.auto.commit=false, max.poll.records=100, > zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183, > group.id=canal-metrics-test, > partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor, > bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096, > 10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096, > auto.offset.reset=latest, client.id > =kafka_admin_consumer-canal_metrics-test} > 2019-05-30 09:59:11.003 [main] ConsumerConfig [INFO] ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, > 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096] > check.crcs = true > client.id = kafka_admin_consumer-canal_metrics-test > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = canal-metrics-test > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 100 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > > 2019-05-30 09:59:11.147 [main] ConsumerConfig [WARN] The configuration > 'zookeeper.connect' was supplied but isn't a known config. > 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka version : 1.1.0 > 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka commitId : > fdcf75ea326b8e07 > 2019-05-30 09:59:11.166 [main] KafkaSystemAdmin [INFO] New admin client > with props:{bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096, > 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096, > zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183} > 2019-05-30 09:59:11.170 [main] AdminClientConfig [INFO] AdminClientConfig > values: > bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096, > 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096] > client.id = > connections.max.idle.ms = 300000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 120000 > retries = 5 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > > 2019-05-30 09:59:11.191 [main] AdminClientConfig [WARN] The configuration > 'zookeeper.connect' was supplied but isn't a known config. > 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka version : 1.1.0 > 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka commitId : > fdcf75ea326b8e07 > 2019-05-30 09:59:11.208 [main] Log4jControllerRegistration$ [INFO] > Registered kafka:type=kafka.Log4jController MBean > 2019-05-30 09:59:11.223 [main] KafkaSystemAdmin [INFO] Created > KafkaSystemAdmin for system kafka > Exception in thread "main" org.apache.samza.SamzaException: Failed to run > application > at > org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79) > at > org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) > at > org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) > Caused by: java.lang.NullPointerException > at java.util.HashMap.merge(HashMap.java:1225) > at > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) > at > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84) > at > org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38) > at > org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55) > at > org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64) > at > org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94) > at > org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57) > at > org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67) > ... 2 more > > > Thanks. > > Qi Shu > > > 在 2019年5月30日,上午3:41,Bharath Kumara Subramanian <codin.mart...@gmail.com> > 写道: > > > > Hi Qi, > > > > Can you share your application configuration? Especially the systems your > > application consumes and produces to and its related configuration. > > Also, would it be possible for to attach the entire log? > > > > Thanks, > > Bharath > > > > On Tue, May 28, 2019 at 7:07 PM QiShu <sh...@eefung.com> wrote: > > > >> Hi, > >> > >> Below is the running environment: > >> Hadoop version 3.1.0 > >> java version “1.8.0_151" > >> samza-api-1.1.0.jar > >> samza-core_2.12-1.1.0.jar > >> samza-kafka_2.12-1.1.0.jar > >> samza-kv_2.12-1.1.0.jar > >> samza-kv-inmemory_2.12-1.1.0.jar > >> samza-kv-rocksdb_2.12-1.1.0.jar > >> samza-log4j_2.12-1.1.0.jar > >> samza-shell-1.1.0-dist.tgz > >> samza-yarn_2.12-1.1.0.jar > >> scala-compiler-2.12.1.jar > >> scala-library-2.12.1.jar > >> scala-logging_2.12-3.7.2.jar > >> scala-parser-combinators_2.12-1.0.4.jar > >> scala-reflect-2.12.4.jar > >> scalate-core_2.12-1.8.0.jar > >> scalate-util_2.12-1.8.0.jar > >> scalatra_2.12-2.5.0.jar > >> scalatra-common_2.12-2.5.0.jar > >> scalatra-scalate_2.12-2.5.0.jar > >> scala-xml_2.12-1.0.6.jar > >> kafka_2.12-1.1.0.jar > >> kafka-clients-1.1.0.jar > >> > >> Below is the exception when starting app in Yarn: > >> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka version : > 1.1.0 > >> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka commitId : > >> fdcf75ea326b8e07 > >> 2019-05-29 09:52:47.862 [main] Log4jControllerRegistration$ [INFO] > >> Registered kafka:type=kafka.Log4jController MBean > >> 2019-05-29 09:52:47.877 [main] KafkaSystemAdmin [INFO] Created > >> KafkaSystemAdmin for system kafka > >> Exception in thread "main" org.apache.samza.SamzaException: Failed to > run > >> application > >> at > >> > org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79) > >> at > >> > org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49) > >> at > >> > org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53) > >> Caused by: java.lang.NullPointerException > >> at java.util.HashMap.merge(HashMap.java:1225) > >> at > >> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) > >> at > >> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > >> at > >> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696) > >> at > >> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > >> at > >> > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > >> at > >> > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > >> at > >> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > >> at > >> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > >> at > >> > org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84) > >> at > >> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38) > >> at > >> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55) > >> at > >> > org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64) > >> at > >> > org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94) > >> at > >> > org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57) > >> at > >> > org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67) > >> ... 2 more > >> > >> > >> Thanks for your help! > >> > >> Qi Shu > >> > >> > >> > >