This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b11431db95 Use single thread to refresh kerberos (#13456)
b11431db95 is described below

commit b11431db9565093a766e27dc12240a27e44f2f4c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Feb 1 20:41:05 2023 +0800

    Use single thread to refresh kerberos (#13456)
---
 .../common/thread/ThreadUtils.java                 |   9 ++
 .../datasource/hive/HiveDataSourceClient.java      |  87 ++------------
 .../hive/security/UserGroupInformationFactory.java | 129 +++++++++++++++++++++
 .../plugin/datasource/hive/utils/CommonUtil.java   |  67 -----------
 4 files changed, 149 insertions(+), 143 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 6eca0e783f..f73c40b960 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.thread;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 import lombok.experimental.UtilityClass;
@@ -45,6 +46,14 @@ public class ThreadUtils {
         return Executors.newFixedThreadPool(threadsNum, threadFactory);
     }
 
+    public static ScheduledExecutorService 
newSingleDaemonScheduledExecutorService(String threadName) {
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat(threadName)
+                .setDaemon(true)
+                .build();
+        return Executors.newSingleThreadScheduledExecutor(threadFactory);
+    }
+
     /**
      * Sleep in given mills, this is not accuracy.
      */
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index f2539c7992..c7eef482da 100644
--- 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -21,44 +21,29 @@ import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_S
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import 
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
 import 
org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
-import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
+import 
org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import sun.security.krb5.Config;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.JdbcTemplate;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 public class HiveDataSourceClient extends CommonDataSourceClient {
 
     private static final Logger logger = 
LoggerFactory.getLogger(HiveDataSourceClient.class);
 
-    private ScheduledExecutorService kerberosRenewalService;
-
-    private Configuration hadoopConf;
-    private UserGroupInformation ugi;
-    private boolean retryGetConnection = true;
-
     public HiveDataSourceClient(BaseConnectionParam baseConnectionParam, 
DbType dbType) {
         super(baseConnectionParam, dbType);
     }
@@ -66,18 +51,12 @@ public class HiveDataSourceClient extends 
CommonDataSourceClient {
     @Override
     protected void preInit() {
         logger.info("PreInit in {}", getClass().getName());
-        this.kerberosRenewalService = 
Executors.newSingleThreadScheduledExecutor(
-                new 
ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
     }
 
     @Override
     protected void initClient(BaseConnectionParam baseConnectionParam, DbType 
dbType) {
-        logger.info("Create Configuration for hive configuration.");
-        this.hadoopConf = createHadoopConf();
-        logger.info("Create Configuration success.");
-
         logger.info("Create UserGroupInformation.");
-        this.ugi = createUserGroupInformation(baseConnectionParam.getUser());
+        UserGroupInformationFactory.login(baseConnectionParam.getUser());
         logger.info("Create ugi success.");
 
         this.dataSource = 
JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, 
dbType);
@@ -108,61 +87,18 @@ public class HiveDataSourceClient extends 
CommonDataSourceClient {
         }
     }
 
-    private UserGroupInformation createUserGroupInformation(String username) {
-        String krb5File = 
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH);
-        String keytab = 
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH);
-        String principal = 
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);
-
-        try {
-            UserGroupInformation ugi = CommonUtil.createUGI(getHadoopConf(), 
principal, keytab, krb5File, username);
-            try {
-                Field isKeytabField = 
ugi.getClass().getDeclaredField("isKeytab");
-                isKeytabField.setAccessible(true);
-                isKeytabField.set(ugi, true);
-            } catch (NoSuchFieldException | IllegalAccessException e) {
-                logger.warn(e.getMessage());
-            }
-
-            kerberosRenewalService.scheduleWithFixedDelay(() -> {
-                try {
-                    ugi.checkTGTAndReloginFromKeytab();
-                } catch (IOException e) {
-                    logger.error("Check TGT and Renewal from Keytab error", e);
-                }
-            }, 5, 5, TimeUnit.MINUTES);
-            return ugi;
-        } catch (IOException e) {
-            throw new RuntimeException("createUserGroupInformation fail. ", e);
-        }
-    }
-
-    protected Configuration createHadoopConf() {
-        Configuration hadoopConf = new Configuration();
-        hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed", 
true);
-        return hadoopConf;
-    }
-
-    protected Configuration getHadoopConf() {
-        return this.hadoopConf;
-    }
-
     @Override
     public Connection getConnection() {
-        try {
-            return dataSource.getConnection();
-        } catch (SQLException e) {
-            boolean kerberosStartupState =
-                    
PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
-            if (retryGetConnection && kerberosStartupState) {
-                retryGetConnection = false;
-                createUserGroupInformation(baseConnectionParam.getUser());
-                Connection connection = getConnection();
-                retryGetConnection = true;
-                return connection;
+        Connection connection = null;
+        while (connection == null) {
+            try {
+                connection = dataSource.getConnection();
+            } catch (SQLException e) {
+                
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
+                
UserGroupInformationFactory.login(baseConnectionParam.getUser());
             }
-            logger.error("get oneSessionDataSource Connection fail 
SQLException: {}", e.getMessage(), e);
-            return null;
         }
+        return connection;
     }
 
     @Override
@@ -170,8 +106,7 @@ public class HiveDataSourceClient extends 
CommonDataSourceClient {
         try {
             super.close();
         } finally {
-            kerberosRenewalService.shutdown();
-            this.ugi = null;
+            UserGroupInformationFactory.logout(baseConnectionParam.getUser());
         }
         logger.info("Closed Hive datasource client.");
 
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java
new file mode 100644
index 0000000000..168ff3bdca
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.hive.security;
+
+import static 
org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ResUploadType;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class UserGroupInformationFactory {
+
+    private static final Map<String, Integer> currentLoginTimesMap = new 
HashMap<>();
+
+    private static final Map<String, UserGroupInformation> 
userGroupInformationMap = new HashMap<>();
+
+    private static final ScheduledExecutorService kerberosRenewalService =
+            
ThreadUtils.newSingleDaemonScheduledExecutorService("Hive-Kerberos-Renewal-Thread-");
+
+    static {
+        kerberosRenewalService.scheduleWithFixedDelay(() -> {
+            if (userGroupInformationMap.isEmpty()) {
+                return;
+            }
+            userGroupInformationMap.forEach((key, ugi) -> {
+                try {
+                    if (ugi.isFromKeytab()) {
+                        ugi.checkTGTAndReloginFromKeytab();
+                    }
+                    log.info("Relogin from keytab success, user: {}", key);
+                } catch (Exception e) {
+                    log.error("Relogin from keytab failed, user: {}", key, e);
+                }
+            });
+        }, 0, 5, TimeUnit.MINUTES);
+    }
+
+    public synchronized static UserGroupInformation login(String userName) {
+        UserGroupInformation userGroupInformation = 
userGroupInformationMap.get(userName);
+        if (userGroupInformation == null) {
+            if (!openKerberos()) {
+                userGroupInformation = createRemoteUser(userName);
+            } else {
+                userGroupInformation = createKerberosUser();
+            }
+            userGroupInformationMap.put(userName, userGroupInformation);
+        }
+        currentLoginTimesMap.compute(userName, (k, v) -> v == null ? 1 : v + 
1);
+        return userGroupInformation;
+    }
+
+    public synchronized static void logout(String userName) {
+        Integer currentLoginTimes = currentLoginTimesMap.get(userName);
+        if (currentLoginTimes == null) {
+            return;
+        }
+        if (currentLoginTimes <= 1) {
+            currentLoginTimesMap.remove(userName);
+            userGroupInformationMap.remove(userName);
+        } else {
+            currentLoginTimesMap.put(userName, currentLoginTimes - 1);
+        }
+    }
+
+    private static UserGroupInformation createRemoteUser(String userName) {
+        return UserGroupInformation.createRemoteUser(userName);
+    }
+
+    private static UserGroupInformation createKerberosUser() {
+        String krb5File = 
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH);
+        String keytab = 
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH);
+        String principal = 
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);
+        if (StringUtils.isNotBlank(krb5File)) {
+            System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File);
+        }
+
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed", 
true);
+        hadoopConf.set(Constants.HADOOP_SECURITY_AUTHENTICATION, 
Constants.KERBEROS);
+
+        try {
+            UserGroupInformation.setConfiguration(hadoopConf);
+            UserGroupInformation userGroupInformation =
+                    
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), 
keytab.trim());
+            UserGroupInformation.setLoginUser(userGroupInformation);
+            return userGroupInformation;
+        } catch (IOException e) {
+            throw new RuntimeException("createUserGroupInformation fail. ", e);
+        }
+    }
+
+    public static boolean openKerberos() {
+        String resUploadStartupType = 
PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
+        ResUploadType resUploadType = 
ResUploadType.valueOf(resUploadStartupType);
+        Boolean kerberosStartupState =
+                
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
 false);
+        return resUploadType == ResUploadType.HDFS && kerberosStartupState;
+    }
+
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java
deleted file mode 100644
index be9ef1ea1f..0000000000
--- 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.datasource.hive.utils;
-
-import static 
org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.IOException;
-import java.util.Objects;
-
-import lombok.experimental.UtilityClass;
-
-@UtilityClass
-public class CommonUtil {
-
-    public static boolean getKerberosStartupState() {
-        String resUploadStartupType = 
PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
-        ResUploadType resUploadType = 
ResUploadType.valueOf(resUploadStartupType);
-        Boolean kerberosStartupState =
-                
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
 false);
-        return resUploadType == ResUploadType.HDFS && kerberosStartupState;
-    }
-
-    public static synchronized UserGroupInformation createUGI(Configuration 
configuration, String principal,
-                                                              String keyTab, 
String krb5File,
-                                                              String username) 
throws IOException {
-        if (getKerberosStartupState()) {
-            Objects.requireNonNull(keyTab);
-            if (StringUtils.isNotBlank(krb5File)) {
-                System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File);
-            }
-            return loginKerberos(configuration, principal, keyTab);
-        }
-        return UserGroupInformation.createRemoteUser(username);
-    }
-
-    public static synchronized UserGroupInformation loginKerberos(final 
Configuration config, final String principal,
-                                                                  final String 
keyTab) throws IOException {
-        config.set(Constants.HADOOP_SECURITY_AUTHENTICATION, 
Constants.KERBEROS);
-        UserGroupInformation.setConfiguration(config);
-        UserGroupInformation.loginUserFromKeytab(principal.trim(), 
keyTab.trim());
-        return UserGroupInformation.getCurrentUser();
-    }
-
-}

Reply via email to