Here is my theory based on the stack trace and logs.
InfluxDBSystemFactory could not create the admin successfully and returned
a null. Inside JavaSystemConfig, when it tries to get all the admins for
the systems available, Collectors.toMap(..) throws an exception during
reducing due to a null value.

Few questions,
 1. Can getAdmin(...) return a null?
 2. Can you add some logs to your getAdmin API to see if they are printed
right before the exception?

I am guessing here since I don't have access to the source code of
*InfluxDBSystemFactory*. If you can share the source code of
InfluxDBSystemFactory, it will help me confirm my theory.

Let me know how it goes.

Thanks,
Bharath

On Wed, May 29, 2019 at 7:01 PM QiShu <sh...@eefung.com> wrote:

> Hi Bharath,
>
>         Below is configuration:
> # App
> app.name=canal-metrics
> app.id=test
> app.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
> task.class=com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
>
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.default.system=kafka
>
> job.container.thread.pool.size=0
> task.max.concurrency=1
>
> task.opts=-Xmx1843m
>
> task.checkpoint.replication.factor=3
>
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Checkpointing
>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>
> # Kafka
> systems.kafka.consumer.zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183
> systems.kafka.producer.bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,
> 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096
>
> # Systems & Streams
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.default.stream.samza.key.serde=string
> systems.kafka.default.stream.samza.msg.serde=string
>
>
> systems.influxdb.samza.factory=com.antfact.datacenter.canal.system.InfluxDBSystemFactory
> systems.hstore.default.stream.samza.key.serde=string
> systems.hstore.default.stream.samza.msg.serde=string
>
> task.inputs=kafka.samza-metrics
>
> task.consumer.batch.size=100
>
> # Deployment
> yarn.package.path=hdfs://
> p002132.antfact.com/rflow-apps/data/canal-metrics-3.0-dist.tar.gz
> yarn.container.count=1
> cluster-manager.container.memory.mb=2048
>
>
>         Below is entire log:
> added manifest
> java version "1.8.0_151"
> Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
> /usr/java/jdk1.8.0_151/bin/java
> -Dlog4j.configuration=file:bin/log4j-console.xml
> -DisThreadContextMapInheritable=true
> -Dsamza.log.dir=/home/ant/canal-test/canal-metrics/target/canal
> -Djava.io.tmpdir=/home/ant/canal-test/canal-metrics/target/canal/tmp
> -Xmx768M -XX:+PrintGCDateStamps
> -Xloggc:/home/ant/canal-test/canal-metrics/target/canal/gc.log
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
> -XX:GCLogFileSize=10241024 -d64 -cp /etc/hadoop/conf:pathing.jar
> org.apache.samza.runtime.ApplicationRunnerMain
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> --config-path=/home/ant/canal-test/canal-metrics/target/canal/config/canal-metrics.properties
> 2019-05-30 09:59:10.900 [main] TaskFactoryUtil [INFO] Got task class name:
> com.antfact.datacenter.canal.task.metrics.MetricsHandlerTask
> 2019-05-30 09:59:10.914 [main] RemoteJobPlanner [INFO] The run id for this
> run is 1559181550904-ec4b9a69
> 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.name is defined,
> generating job.name equal to app.name value: canal-metrics
> 2019-05-30 09:59:10.958 [main] JobPlanner [INFO] app.id is defined,
> generating job.id equal to app.name value: test
> 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.name is defined,
> generating job.name equal to app.name value: canal-metrics
> 2019-05-30 09:59:10.959 [main] JobPlanner [INFO] app.id is defined,
> generating job.id equal to app.name value: test
> 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] Auto offset
> reset value for KafkaConsumer for system upcoming converted from
> latest(samza) to {}
> 2019-05-30 09:59:10.960 [main] KafkaConsumerConfig [INFO] setting
> auto.offset.reset for system kafka to latest
> 2019-05-30 09:59:10.962 [main] KafkaConsumerConfig [INFO] setting key
> serialization for the consumer(for system kafka) to ByteArrayDeserializer
> 2019-05-30 09:59:10.964 [main] KafkaConsumerConfig [INFO] setting value
> serialization for the consumer(for system kafka) to ByteArrayDeserializer
> 2019-05-30 09:59:10.970 [main] KafkaSystemConsumer [INFO] Instantiating
> KafkaConsumer for systemName kafka with properties
> {key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer,
> value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer,
> enable.auto.commit=false, max.poll.records=100,
> zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183, 
> group.id=canal-metrics-test,
> partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor,
> bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,10.20.1.89:9096,
> 10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096,
> auto.offset.reset=latest, client.id
> =kafka_admin_consumer-canal_metrics-test}
> 2019-05-30 09:59:11.003 [main] ConsumerConfig [INFO] ConsumerConfig values:
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = latest
>         bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096,
> 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096]
>         check.crcs = true
>         client.id = kafka_admin_consumer-canal_metrics-test
>         connections.max.idle.ms = 540000
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = canal-metrics-test
>         heartbeat.interval.ms = 3000
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         isolation.level = read_uncommitted
>         key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 100
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 305000
>         retry.backoff.ms = 100
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism = GSSAPI
>         security.protocol = PLAINTEXT
>         send.buffer.bytes = 131072
>         session.timeout.ms = 10000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> 2019-05-30 09:59:11.147 [main] ConsumerConfig [WARN] The configuration
> 'zookeeper.connect' was supplied but isn't a known config.
> 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka version : 1.1.0
> 2019-05-30 09:59:11.149 [main] AppInfoParser [INFO] Kafka commitId :
> fdcf75ea326b8e07
> 2019-05-30 09:59:11.166 [main] KafkaSystemAdmin [INFO] New admin client
> with props:{bootstrap.servers=10.20.1.87:9096,10.20.1.88:9096,
> 10.20.1.89:9096,10.20.1.90:9096,10.20.1.91:9096,10.20.1.92:9096,
> zookeeper.connect=p-zk1:7183,p-zk2:7183,p-zk3:7183}
> 2019-05-30 09:59:11.170 [main] AdminClientConfig [INFO] AdminClientConfig
> values:
>         bootstrap.servers = [10.20.1.87:9096, 10.20.1.88:9096,
> 10.20.1.89:9096, 10.20.1.90:9096, 10.20.1.91:9096, 10.20.1.92:9096]
>         client.id =
>         connections.max.idle.ms = 300000
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 120000
>         retries = 5
>         retry.backoff.ms = 100
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism = GSSAPI
>         security.protocol = PLAINTEXT
>         send.buffer.bytes = 131072
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>
> 2019-05-30 09:59:11.191 [main] AdminClientConfig [WARN] The configuration
> 'zookeeper.connect' was supplied but isn't a known config.
> 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka version : 1.1.0
> 2019-05-30 09:59:11.191 [main] AppInfoParser [INFO] Kafka commitId :
> fdcf75ea326b8e07
> 2019-05-30 09:59:11.208 [main] Log4jControllerRegistration$ [INFO]
> Registered kafka:type=kafka.Log4jController MBean
> 2019-05-30 09:59:11.223 [main] KafkaSystemAdmin [INFO] Created
> KafkaSystemAdmin for system kafka
> Exception in thread "main" org.apache.samza.SamzaException: Failed to run
> application
>         at
> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
>         at
> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
>         at
> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
> Caused by: java.lang.NullPointerException
>         at java.util.HashMap.merge(HashMap.java:1225)
>         at
> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>         at
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>         at
> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>         at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>         at
> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84)
>         at
> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
>         at
> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55)
>         at
> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64)
>         at
> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94)
>         at
> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57)
>         at
> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67)
>         ... 2 more
>
>
>         Thanks.
>
> Qi Shu
>
> > 在 2019年5月30日,上午3:41,Bharath Kumara Subramanian <codin.mart...@gmail.com>
> 写道:
> >
> > Hi Qi,
> >
> > Can you share your application configuration? Especially the systems your
> > application consumes and produces to and its related configuration.
> > Also, would it be possible for to attach the entire log?
> >
> > Thanks,
> > Bharath
> >
> > On Tue, May 28, 2019 at 7:07 PM QiShu <sh...@eefung.com> wrote:
> >
> >> Hi,
> >>
> >>        Below is the running environment:
> >> Hadoop version 3.1.0
> >> java version “1.8.0_151"
> >> samza-api-1.1.0.jar
> >> samza-core_2.12-1.1.0.jar
> >> samza-kafka_2.12-1.1.0.jar
> >> samza-kv_2.12-1.1.0.jar
> >> samza-kv-inmemory_2.12-1.1.0.jar
> >> samza-kv-rocksdb_2.12-1.1.0.jar
> >> samza-log4j_2.12-1.1.0.jar
> >> samza-shell-1.1.0-dist.tgz
> >> samza-yarn_2.12-1.1.0.jar
> >> scala-compiler-2.12.1.jar
> >> scala-library-2.12.1.jar
> >> scala-logging_2.12-3.7.2.jar
> >> scala-parser-combinators_2.12-1.0.4.jar
> >> scala-reflect-2.12.4.jar
> >> scalate-core_2.12-1.8.0.jar
> >> scalate-util_2.12-1.8.0.jar
> >> scalatra_2.12-2.5.0.jar
> >> scalatra-common_2.12-2.5.0.jar
> >> scalatra-scalate_2.12-2.5.0.jar
> >> scala-xml_2.12-1.0.6.jar
> >> kafka_2.12-1.1.0.jar
> >> kafka-clients-1.1.0.jar
> >>
> >>        Below is the exception when starting app in Yarn:
> >> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka version :
> 1.1.0
> >> 2019-05-29 09:52:47.851 [main] AppInfoParser [INFO] Kafka commitId :
> >> fdcf75ea326b8e07
> >> 2019-05-29 09:52:47.862 [main] Log4jControllerRegistration$ [INFO]
> >> Registered kafka:type=kafka.Log4jController MBean
> >> 2019-05-29 09:52:47.877 [main] KafkaSystemAdmin [INFO] Created
> >> KafkaSystemAdmin for system kafka
> >> Exception in thread "main" org.apache.samza.SamzaException: Failed to
> run
> >> application
> >>        at
> >>
> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:79)
> >>        at
> >>
> org.apache.samza.runtime.ApplicationRunnerUtil.invoke(ApplicationRunnerUtil.java:49)
> >>        at
> >>
> org.apache.samza.runtime.ApplicationRunnerMain.main(ApplicationRunnerMain.java:53)
> >> Caused by: java.lang.NullPointerException
> >>        at java.util.HashMap.merge(HashMap.java:1225)
> >>        at
> >> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
> >>        at
> >> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> >>        at
> >> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
> >>        at
> >> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> >>        at
> >>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> >>        at
> >>
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> >>        at
> >> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> >>        at
> >> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> >>        at
> >>
> org.apache.samza.config.JavaSystemConfig.getSystemAdmins(JavaSystemConfig.java:84)
> >>        at
> >> org.apache.samza.system.SystemAdmins.<init>(SystemAdmins.java:38)
> >>        at
> >> org.apache.samza.execution.StreamManager.<init>(StreamManager.java:55)
> >>        at
> >>
> org.apache.samza.execution.JobPlanner.buildAndStartStreamManager(JobPlanner.java:64)
> >>        at
> >>
> org.apache.samza.execution.JobPlanner.getExecutionPlan(JobPlanner.java:94)
> >>        at
> >>
> org.apache.samza.execution.RemoteJobPlanner.prepareJobs(RemoteJobPlanner.java:57)
> >>        at
> >>
> org.apache.samza.runtime.RemoteApplicationRunner.run(RemoteApplicationRunner.java:67)
> >>        ... 2 more
> >>
> >>
> >>        Thanks for your help!
> >>
> >> Qi Shu
> >>
> >>
> >>
>
>

Reply via email to