[ 
https://issues.apache.org/jira/browse/FLINK-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15157012#comment-15157012
 ] 

ASF GitHub Bot commented on FLINK-3418:
---------------------------------------

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1687

    [FLINK-3418] Don't run RocksDB copy utils in external process

    This was causing to many problems with security tokens and yarn. Now,
    let the RocksDB backup run in a thread but
    don't interrupt these Threads anymore on closing. The Threads will close
    themselves because the copy operation will fail because of a
    FileNotFoundException when the state directories are being cleaned up.
    
    This also removes the ExternalProcessRunner because it is not needed
    anymore and using it causes too many headaches.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink rocksdb-security-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1687.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1687
    
----
commit 421e851f0da752da6ec9d14988759b06680ced03
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2016-02-17T11:34:51Z

    [FLINK-3418] Don't run RocksDB copy utils in external process
    
    This was causing to many problems with security tokens and yarn. Now,
    let the RocksDB backup run in a thread but
    don't interrupt these Threads anymore on closing. The Threads will close
    themselves because the copy operation will fail because of a
    FileNotFoundException when the state directories are being cleaned up.
    
    This also removes the ExternalProcessRunner because it is not needed
    anymore and using it causes too many headaches.

----


> RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security 
> configuration
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-3418
>                 URL: https://issues.apache.org/jira/browse/FLINK-3418
>             Project: Flink
>          Issue Type: Bug
>          Components: state backends
>            Reporter: Robert Metzger
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>
> As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers 
> are running in a special UserGroupInformation.doAs(); call. 
> With that call, we are manually changing the user from the user starting the 
> YARN NodeManager (our containers are part of that process tree) to the user 
> who submitted the job.
> For example on my cluster, the NodeManager runs as "yarn", but "robert" 
> submits the job. For regular file access, "robert" is accessing the files in 
> HDFS, even though "yarn" runs the process.
> The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence 
> "yarn" tries to access the files, leading to the following exception:
> {code}
> Caused by: java.lang.RuntimeException: Error while copying to remote 
> FileSystem: SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
> Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>       at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>       at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>       at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>       at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2755)
>       at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2724)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859)
>       at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:351)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
>       at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1905)
>       at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1873)
>       at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1838)
>       at 
> org.apache.flink.contrib.streaming.state.HDFSCopyFromLocal.main(HDFSCopyFromLocal.java:47)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
>  Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>       at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1468)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1399)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>       at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2753)
>       ... 13 more
>       at 
> org.apache.flink.contrib.streaming.state.HDFSCopyFromLocal.copyFromLocal(HDFSCopyFromLocal.java:54)
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState$AsyncRocksDBSnapshot.materialize(AbstractRocksDBState.java:454)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.run(StreamTask.java:531)
> {code}
> I think we need to fix this before the 1.0.0 release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to