[
https://issues.apache.org/jira/browse/FLUME-3340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ODa updated FLUME-3340:
-----------------------
Description:
Hello,
I'm a big data administrator with cloudera distribution.
We use Flume to collect external data and we push it in hdfs (hadoop).
We have a issue with hdfs sink with theses messages :
2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating
hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp
2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process
failed
java.lang.NullPointerException
at
org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
at
org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
at
org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at
org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
at
org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
at
org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
The source kafka (confluent community) is securiezed with SASL_PLAINTEXT
The flume configuration is :(
agent.sources = apps
agent.channels = appsChannel
agent.sinks = appsSink
###
### Source definition
###
agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092
agent.sources.apps.kafka.topics = mytopic01
agent.sources.apps.kafka.consumer.client.id=ClientIDapps
agent.sources.apps.kafka.consumer.group.id=GroupIDapps
agent.sources.apps.channels = appsChannel
agent.sources.apps.batchSize=500
agent.sources.apps.interceptors = i1 hostint
agent.sources.apps.interceptors.i1.type = timestamp
agent.sources.apps.interceptors.hostint.type =
org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.apps.interceptors.hostint.preserveExisting = true
agent.sources.apps.interceptors.hostint.useIP = false
agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username=username password=password ;
###
### Channel definition
###
agent.channels.appsChannel.type = memory
agent.channels.appsChannel.capacity = 500000
agent.channels.appsChannel.transactionCapacity = 1000
###
### Sink definition
agent.sinks.appsSink.type = hdfs
agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
agent.sinks.appsSink.maxOpenFiles = 100
agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
agent.sinks.appsSink.hdfs.fileSuffix=.json
agent.sinks.appsSink.hdfs.rollInterval=60
agent.sinks.appsSink.hdfs.rollSize=0
agent.sinks.appsSink.hdfs.rollCount=100000
agent.sinks.appsSink.hdfs.idleTimeout=60
agent.sinks.appsSink.hdfs.callTimeout=60000
agent.sinks.appsSink.hdfs.batchSize=1000
agent.sinks.appsSink.hdfs.fileType=DataStream
agent.sinks.appsSink.hdfs.writeFormat=Writable
agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
agent.sinks.appsSink.hdfs.serializer=TEXT
agent.sinks.appsSink.channel = appsChannel
Best Regards,
ODa
was:
Hello,
I'm a big data administrator with cloudera distribution.
We use Flume to collect external data and we push it in hdfs (hadoop).
We have a issue with hdfs sink with theses messages :
2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating
hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp
2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process
failed
java.lang.NullPointerException
at
org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
at
org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
at
org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at
org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
at
org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
at
org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
The source kafka (confluent community) is securiezed with SASL_PLAINTEXT
The flume configuration is :(
agent.sources = apps
agent.channels = appsChannel
agent.sinks = appsSink
###
### Source definition
###
agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.apps.kafka.bootstrap.servers =kafka1:9092,kafka2:9092,kafka3:9092
agent.sources.apps.kafka.topics = mytopic01
agent.sources.apps.kafka.consumer.client.id=ClientIDapps
agent.sources.apps.kafka.consumer.group.id=GroupIDapps
agent.sources.apps.channels = appsChannel
agent.sources.apps.batchSize=500
agent.sources.apps.interceptors = i1 hostint
agent.sources.apps.interceptors.i1.type = timestamp
agent.sources.apps.interceptors.hostint.type =
org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.apps.interceptors.hostint.preserveExisting = true
agent.sources.apps.interceptors.hostint.useIP = false
agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username=username password=password ;
###
### Channel definition
###
agent.channels.appsChannel.type = memory
agent.channels.appsChannel.capacity = 500000
agent.channels.appsChannel.transactionCapacity = 1000
###
### Sink definition
agent.sinks.appsSink.type = hdfs
agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
agent.sinks.appsSink.maxOpenFiles = 100
agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
agent.sinks.appsSink.hdfs.fileSuffix=.json
agent.sinks.appsSink.hdfs.rollInterval=60
agent.sinks.appsSink.hdfs.rollSize=0
agent.sinks.appsSink.hdfs.rollCount=100000
agent.sinks.appsSink.hdfs.idleTimeout=60
agent.sinks.appsSink.hdfs.callTimeout=60000
agent.sinks.appsSink.hdfs.batchSize=1000
agent.sinks.appsSink.hdfs.fileType=DataStream
agent.sinks.appsSink.hdfs.writeFormat=Writable
agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
agent.sinks.appsSink.hdfs.serializer=TEXT
agent.sinks.appsSink.channel = appsChannel
> Error with HDFS SINK kerberized
> -------------------------------
>
> Key: FLUME-3340
> URL: https://issues.apache.org/jira/browse/FLUME-3340
> Project: Flume
> Issue Type: Choose from below ...
> Components: Configuration
> Affects Versions: 1.6.0
> Environment: Flume 1.6 (embedded release in CDH 15.16.1)
> kafka 2.11-2.1.1cp1-1 (conluent community)
> 3 securized brokers witch SASL_PLAINTEXT (mechanism PLAIN)
> Hadoop 2.6.0 (CDH 15.6.1) kerberized
> RHEL 6.9
> Reporter: ODa
> Priority: Blocker
>
> Hello,
> I'm a big data administrator with cloudera distribution.
> We use Flume to collect external data and we push it in hdfs (hadoop).
> We have a issue with hdfs sink with theses messages :
>
> 2019-07-11 09:22:12,147 INFO org.apache.flume.sink.hdfs.BucketWriter:
> Creating hdfs:/app/2019-07-11/apps_flume01_2019-07-11.1562829732122.json.tmp
> 2019-07-11 09:22:12,441 ERROR org.apache.flume.sink.hdfs.HDFSEventSink:
> process failed
> java.lang.NullPointerException
> at
> org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
> at
> org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
> at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
> at
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
> at
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
> at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
> at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
> at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
> at
> org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
> at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2019-07-11 09:22:12,443 ERROR org.apache.flume.SinkRunner: Unable to deliver
> event. Exception follows.
> org.apache.flume.EventDeliveryException: java.lang.NullPointerException
> at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at
> org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:165)
> at
> org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
> at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
> at
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
> at
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
> at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:763)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:694)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:378)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:260)
> at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:252)
> at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
> at org.apache.flume.auth.UGIExecutor.execute(UGIExecutor.java:46)
> at
> org.apache.flume.auth.KerberosAuthenticator.execute(KerberosAuthenticator.java:64)
> at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
>
>
> The source kafka (confluent community) is securiezed with SASL_PLAINTEXT
>
> The flume configuration is :(
>
> agent.sources = apps
> agent.channels = appsChannel
> agent.sinks = appsSink
> ###
> ### Source definition
> ###
> agent.sources.apps.type = org.apache.flume.source.kafka.KafkaSource
> agent.sources.apps.kafka.bootstrap.servers
> =kafka1:9092,kafka2:9092,kafka3:9092
> agent.sources.apps.kafka.topics = mytopic01
> agent.sources.apps.kafka.consumer.client.id=ClientIDapps
> agent.sources.apps.kafka.consumer.group.id=GroupIDapps
> agent.sources.apps.channels = appsChannel
> agent.sources.apps.batchSize=500
> agent.sources.apps.interceptors = i1 hostint
> agent.sources.apps.interceptors.i1.type = timestamp
> agent.sources.apps.interceptors.hostint.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> agent.sources.apps.interceptors.hostint.preserveExisting = true
> agent.sources.apps.interceptors.hostint.useIP = false
> agent.sources.apps.kafka.consumer.security.protocol=SASL_PLAINTEXT
> agent.sources.apps.kafka.consumer.sasl.mechanism=PLAIN
> agent.sources.apps.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
> required username=username password=password ;
> ###
> ### Channel definition
> ###
> agent.channels.appsChannel.type = memory
> agent.channels.appsChannel.capacity = 500000
> agent.channels.appsChannel.transactionCapacity = 1000
>
> ###
> ### Sink definition
> agent.sinks.appsSink.type = hdfs
> agent.sinks.appsSink.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
> agent.sinks.appsSink.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
> agent.sinks.appsSink.maxOpenFiles = 100
> agent.sinks.appsSink.hdfs.path = hdfs:/apps/%Y-%m-%d
> agent.sinks.appsSink.hdfs.filePrefix=apps_%\{host}_%Y-%m-%d
> agent.sinks.appsSink.hdfs.fileSuffix=.json
> agent.sinks.appsSink.hdfs.rollInterval=60
> agent.sinks.appsSink.hdfs.rollSize=0
> agent.sinks.appsSink.hdfs.rollCount=100000
> agent.sinks.appsSink.hdfs.idleTimeout=60
> agent.sinks.appsSink.hdfs.callTimeout=60000
> agent.sinks.appsSink.hdfs.batchSize=1000
> agent.sinks.appsSink.hdfs.fileType=DataStream
> agent.sinks.appsSink.hdfs.writeFormat=Writable
> agent.sinks.appsSink.hdfs.useLocalTimeStamp=false
> agent.sinks.appsSink.hdfs.serializer=TEXT
> agent.sinks.appsSink.channel = appsChannel
> Best Regards,
> ODa
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]