[ 
https://issues.apache.org/jira/browse/FLINK-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15157026#comment-15157026
 ] 

ASF GitHub Bot commented on FLINK-3418:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1687#discussion_r53628814
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
    @@ -26,32 +25,46 @@
     import java.io.File;
     import java.io.FileInputStream;
     import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.List;
     
     /**
    - * Utility for copying from local file system to a HDFS {@link FileSystem} 
in an external process.
    - * This is required since {@code FileSystem.copyFromLocalFile} does not 
like being interrupted.
    + * Utility for copying from local file system to a HDFS {@link FileSystem}.
      */
     public class HDFSCopyFromLocal {
    -   public static void main(String[] args) throws Exception {
    -           String hadoopConfPath = args[0];
    -           String localBackupPath = args[1];
    -           String backupUri = args[2];
    -
    -           Configuration hadoopConf = new Configuration();
    -           try (DataInputStream in = new DataInputStream(new 
FileInputStream(hadoopConfPath))) {
    -                   hadoopConf.readFields(in);
    -           }
     
    -           FileSystem fs = FileSystem.get(new URI(backupUri), hadoopConf);
    +   public static void copyFromLocal(final File hadoopConfPath, final File 
localPath, final URI remotePath) throws Exception {
    +           // Do it in another Thread because HDFS can deadlock if being 
interrupted while copying
     
    -           fs.copyFromLocalFile(new Path(localBackupPath), new 
Path(backupUri));
    -   }
    +           String threadName = "HDFS Copy from " + localPath + " to " + 
remotePath;
    +
    +           final List<Exception> asyncException = new ArrayList<>();
    +
    +           Thread copyThread = new Thread(threadName) {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   Configuration hadoopConf = new 
Configuration();
    --- End diff --
    
    This is only loading the Hadoop configuration from the classpath, not from 
the Flink configuration or environment variables
    
    I think our filesystem code has a method to try the environment variables 
and the config as well.


> RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security 
> configuration
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-3418
>                 URL: https://issues.apache.org/jira/browse/FLINK-3418
>             Project: Flink
>          Issue Type: Bug
>          Components: state backends
>            Reporter: Robert Metzger
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>
> As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers 
> are running in a special UserGroupInformation.doAs(); call. 
> With that call, we are manually changing the user from the user starting the 
> YARN NodeManager (our containers are part of that process tree) to the user 
> who submitted the job.
> For example on my cluster, the NodeManager runs as "yarn", but "robert" 
> submits the job. For regular file access, "robert" is accessing the files in 
> HDFS, even though "yarn" runs the process.
> The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence 
> "yarn" tries to access the files, leading to the following exception:
> {code}
> Caused by: java.lang.RuntimeException: Error while copying to remote 
> FileSystem: SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
> Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>       at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>       at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>       at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>       at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2755)
>       at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2724)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859)
>       at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:351)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
>       at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1905)
>       at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1873)
>       at 
> org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1838)
>       at 
> org.apache.flink.contrib.streaming.state.HDFSCopyFromLocal.main(HDFSCopyFromLocal.java:47)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
>  Permission denied: user=yarn, access=WRITE, 
> inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
>       at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1468)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1399)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>       at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2753)
>       ... 13 more
>       at 
> org.apache.flink.contrib.streaming.state.HDFSCopyFromLocal.copyFromLocal(HDFSCopyFromLocal.java:54)
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState$AsyncRocksDBSnapshot.materialize(AbstractRocksDBState.java:454)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$1.run(StreamTask.java:531)
> {code}
> I think we need to fix this before the 1.0.0 release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to