JackWangCS commented on a change in pull request #14841:
URL: https://github.com/apache/flink/pull/14841#discussion_r571630224



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopDelegationTokenProvider.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;

Review comment:
       DT provider could also work with other deployments in the future if we 
leverage DT for long running jobs(instead of using keytab to connect HDFS/Hive 
directly), but this needs a big refactor.
   
   

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HBaseDelegationTokenProvider.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    private org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public boolean delegationTokensRequired(
+            Configuration flinkConf, org.apache.hadoop.conf.Configuration 
hadoopConf) {
+        if (UserGroupInformation.isSecurityEnabled()) {
+            hbaseConf = createHBaseConfiguration(hadoopConf);
+            LOG.debug("HBase security setting: {}", 
hbaseConf.get("hbase.security.authentication"));
+
+            boolean required = 
"kerberos".equals(hbaseConf.get("hbase.security.authentication"));
+            if (!required) {
+                LOG.debug("HBase has not been configured to use Kerberos.");
+            }
+            return required;
+        } else {
+            return false;
+        }
+    }
+
+    private org.apache.hadoop.conf.Configuration createHBaseConfiguration(
+            org.apache.hadoop.conf.Configuration conf) {
+        try {
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            return (org.apache.hadoop.conf.Configuration)
+                    Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                            .getMethod("create", 
org.apache.hadoop.conf.Configuration.class)
+                            .invoke(null, conf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this 
application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return conf;
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(
+            Configuration flinkConf,
+            org.apache.hadoop.conf.Configuration hadoopConf,
+            Credentials credentials) {
+        Token<?> token;
+        try {
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                
Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                
org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                
Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                
org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        
Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                
Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", 
connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");

Review comment:
       For HBase case, I think we shouldn't fail here. It's very common that 
the hbase-site.xml is in the classpath but the job does not access the HBase. 
If we fail here, it will impact these kinds of jobs.
   

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProvider.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Delegation token provider implementation for Hadoop FileSystems. */
+public class HadoopFSDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
+
+    @Override
+    public String serviceName() {
+        return "hadoopfs";
+    }
+
+    @Override
+    public boolean delegationTokensRequired(
+            Configuration flinkConf, org.apache.hadoop.conf.Configuration 
hadoopConf) {
+        return UserGroupInformation.isSecurityEnabled();
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(
+            Configuration flinkConf,
+            org.apache.hadoop.conf.Configuration hadoopConf,
+            Credentials credentials) {
+        try {
+            Set<FileSystem> fileSystemsToAccess = 
getFileSystemsToAccess(flinkConf, hadoopConf);
+
+            final String renewer = getTokenRenewer(hadoopConf);
+            fileSystemsToAccess.forEach(
+                    fs -> {
+                        try {
+                            LOG.info("Getting FS token for: {} with renewer 
{}", fs, renewer);
+                            fs.addDelegationTokens(renewer, credentials);
+                        } catch (IOException e) {
+                            LOG.warn("Failed to get token for {}.", fs);
+                        }
+                    });
+        } catch (IOException e) {
+            LOG.error("Failed to obtain tokens for Hadoop FileSystems: {}", 
e.getMessage());
+        }
+        // Flink does not support to renew the delegation token currently

Review comment:
       Thanks for pointing out this. I have changed the code so that 
`getTokenRenewer` will hanlde the IOException and if it fails to get renewer, 
it will return null to indicate that.
   
   Also, if the job fails to get the FS DTs, it's supposed to fail, so I will 
throw a runtime exception to fail fast

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopDelegationTokenProvider.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.security.Credentials;
+
+import java.util.Optional;
+
+/** Hadoop delegation token provider. */
+public interface HadoopDelegationTokenProvider {
+
+    /** Name of the service to provide delegation tokens. This name should be 
unique. */
+    String serviceName();
+
+    /**
+     * Return true if delegation tokens are required for this service.
+     *
+     * @param flinkConf Flink configuration
+     * @param hadoopConf Hadoop configuration
+     * @return true if delegation tokens are required
+     */
+    boolean delegationTokensRequired(

Review comment:
       The `flinkConf` contains the Flink configuration, user could specify 
some additional configurations when implementing the 
`HadoopDelegationTokenProvider`. The `hadoopConf` is the hadoop conf for the 
Yarn cluster the job will be submitted to.
   As far as I know, DelegationTokenIssuer will automatically handle the 
existing tokens case:  
   
https://github.com/apache/hadoop/blob/19ae0faacc5b1e8ce05f11c555922c0c025dcf3b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/DelegationTokenIssuer.java#L58
   
   The `init()` method is a good idea, I will change to that.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProviderTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import sun.security.krb5.KrbException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link HadoopFSDelegationTokenProvider}. */
+public class HadoopFSDelegationTokenProviderTest {
+
+    public static final String HADOOP_SECURITY_AUTHENTICATION = 
"hadoop.security.authentication";
+    private final org.apache.flink.configuration.Configuration flinkConf =
+            new org.apache.flink.configuration.Configuration();
+
+    @Test
+    public void testShouldReturnFalseWhenSecurityIsNotEnabled() {
+        HadoopFSDelegationTokenProvider provider = new 
HadoopFSDelegationTokenProvider();
+
+        final Configuration hadoopConf = new Configuration();
+        
assumeTrue("simple".equals(hadoopConf.get(HADOOP_SECURITY_AUTHENTICATION)));
+        assertFalse(
+                "Hadoop FS delegation tokens are not required when 
authentication is simple",
+                provider.delegationTokensRequired(flinkConf, hadoopConf));
+    }
+
+    @Test
+    public void testShouldReturnTrueWhenSecurityIsEnabled() throws 
KrbException {
+        // fake the realm when kerberos is enabled
+        System.setProperty("java.security.krb5.kdc", "");
+        System.setProperty("java.security.krb5.realm", "DEFAULT.REALM");
+        System.setProperty("java.security.krb5.conf", "/dev/null");
+        sun.security.krb5.Config.refresh();
+
+        final Configuration hadoopConf = new Configuration();
+        // set new hadoop conf to UGI to re-initialize it
+        hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+        try {
+            UserGroupInformation.setConfiguration(hadoopConf);
+            HadoopFSDelegationTokenProvider provider = new 
HadoopFSDelegationTokenProvider();
+            assertTrue(
+                    "Hadoop FS delegation tokens are required when 
authentication is not simple",
+                    provider.delegationTokensRequired(flinkConf, hadoopConf));
+        } finally {
+            // restore the default UGI
+            UserGroupInformation.setConfiguration(new Configuration());

Review comment:
       yes, it's better to clean up these properties.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to