This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b11431db95 Use single thread to refresh kerberos (#13456)
b11431db95 is described below
commit b11431db9565093a766e27dc12240a27e44f2f4c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Feb 1 20:41:05 2023 +0800
Use single thread to refresh kerberos (#13456)
---
.../common/thread/ThreadUtils.java | 9 ++
.../datasource/hive/HiveDataSourceClient.java | 87 ++------------
.../hive/security/UserGroupInformationFactory.java | 129 +++++++++++++++++++++
.../plugin/datasource/hive/utils/CommonUtil.java | 67 -----------
4 files changed, 149 insertions(+), 143 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 6eca0e783f..f73c40b960 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import lombok.experimental.UtilityClass;
@@ -45,6 +46,14 @@ public class ThreadUtils {
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
+ public static ScheduledExecutorService
newSingleDaemonScheduledExecutorService(String threadName) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadName)
+ .setDaemon(true)
+ .build();
+ return Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+
/**
* Sleep in given mills, this is not accuracy.
*/
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index f2539c7992..c7eef482da 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -21,44 +21,29 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_S
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import
org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import
org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
-import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
+import
org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import sun.security.krb5.Config;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
public class HiveDataSourceClient extends CommonDataSourceClient {
private static final Logger logger =
LoggerFactory.getLogger(HiveDataSourceClient.class);
- private ScheduledExecutorService kerberosRenewalService;
-
- private Configuration hadoopConf;
- private UserGroupInformation ugi;
- private boolean retryGetConnection = true;
-
public HiveDataSourceClient(BaseConnectionParam baseConnectionParam,
DbType dbType) {
super(baseConnectionParam, dbType);
}
@@ -66,18 +51,12 @@ public class HiveDataSourceClient extends
CommonDataSourceClient {
@Override
protected void preInit() {
logger.info("PreInit in {}", getClass().getName());
- this.kerberosRenewalService =
Executors.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
}
@Override
protected void initClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
- logger.info("Create Configuration for hive configuration.");
- this.hadoopConf = createHadoopConf();
- logger.info("Create Configuration success.");
-
logger.info("Create UserGroupInformation.");
- this.ugi = createUserGroupInformation(baseConnectionParam.getUser());
+ UserGroupInformationFactory.login(baseConnectionParam.getUser());
logger.info("Create ugi success.");
this.dataSource =
JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam,
dbType);
@@ -108,61 +87,18 @@ public class HiveDataSourceClient extends
CommonDataSourceClient {
}
}
- private UserGroupInformation createUserGroupInformation(String username) {
- String krb5File =
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH);
- String keytab =
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH);
- String principal =
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);
-
- try {
- UserGroupInformation ugi = CommonUtil.createUGI(getHadoopConf(),
principal, keytab, krb5File, username);
- try {
- Field isKeytabField =
ugi.getClass().getDeclaredField("isKeytab");
- isKeytabField.setAccessible(true);
- isKeytabField.set(ugi, true);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- logger.warn(e.getMessage());
- }
-
- kerberosRenewalService.scheduleWithFixedDelay(() -> {
- try {
- ugi.checkTGTAndReloginFromKeytab();
- } catch (IOException e) {
- logger.error("Check TGT and Renewal from Keytab error", e);
- }
- }, 5, 5, TimeUnit.MINUTES);
- return ugi;
- } catch (IOException e) {
- throw new RuntimeException("createUserGroupInformation fail. ", e);
- }
- }
-
- protected Configuration createHadoopConf() {
- Configuration hadoopConf = new Configuration();
- hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed",
true);
- return hadoopConf;
- }
-
- protected Configuration getHadoopConf() {
- return this.hadoopConf;
- }
-
@Override
public Connection getConnection() {
- try {
- return dataSource.getConnection();
- } catch (SQLException e) {
- boolean kerberosStartupState =
-
PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
- if (retryGetConnection && kerberosStartupState) {
- retryGetConnection = false;
- createUserGroupInformation(baseConnectionParam.getUser());
- Connection connection = getConnection();
- retryGetConnection = true;
- return connection;
+ Connection connection = null;
+ while (connection == null) {
+ try {
+ connection = dataSource.getConnection();
+ } catch (SQLException e) {
+
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
+
UserGroupInformationFactory.login(baseConnectionParam.getUser());
}
- logger.error("get oneSessionDataSource Connection fail
SQLException: {}", e.getMessage(), e);
- return null;
}
+ return connection;
}
@Override
@@ -170,8 +106,7 @@ public class HiveDataSourceClient extends
CommonDataSourceClient {
try {
super.close();
} finally {
- kerberosRenewalService.shutdown();
- this.ugi = null;
+ UserGroupInformationFactory.logout(baseConnectionParam.getUser());
}
logger.info("Closed Hive datasource client.");
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java
new file mode 100644
index 0000000000..168ff3bdca
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.hive.security;
+
+import static
org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.ResUploadType;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class UserGroupInformationFactory {
+
+ private static final Map<String, Integer> currentLoginTimesMap = new
HashMap<>();
+
+ private static final Map<String, UserGroupInformation>
userGroupInformationMap = new HashMap<>();
+
+ private static final ScheduledExecutorService kerberosRenewalService =
+
ThreadUtils.newSingleDaemonScheduledExecutorService("Hive-Kerberos-Renewal-Thread-");
+
+ static {
+ kerberosRenewalService.scheduleWithFixedDelay(() -> {
+ if (userGroupInformationMap.isEmpty()) {
+ return;
+ }
+ userGroupInformationMap.forEach((key, ugi) -> {
+ try {
+ if (ugi.isFromKeytab()) {
+ ugi.checkTGTAndReloginFromKeytab();
+ }
+ log.info("Relogin from keytab success, user: {}", key);
+ } catch (Exception e) {
+ log.error("Relogin from keytab failed, user: {}", key, e);
+ }
+ });
+ }, 0, 5, TimeUnit.MINUTES);
+ }
+
+ public synchronized static UserGroupInformation login(String userName) {
+ UserGroupInformation userGroupInformation =
userGroupInformationMap.get(userName);
+ if (userGroupInformation == null) {
+ if (!openKerberos()) {
+ userGroupInformation = createRemoteUser(userName);
+ } else {
+ userGroupInformation = createKerberosUser();
+ }
+ userGroupInformationMap.put(userName, userGroupInformation);
+ }
+ currentLoginTimesMap.compute(userName, (k, v) -> v == null ? 1 : v +
1);
+ return userGroupInformation;
+ }
+
+ public synchronized static void logout(String userName) {
+ Integer currentLoginTimes = currentLoginTimesMap.get(userName);
+ if (currentLoginTimes == null) {
+ return;
+ }
+ if (currentLoginTimes <= 1) {
+ currentLoginTimesMap.remove(userName);
+ userGroupInformationMap.remove(userName);
+ } else {
+ currentLoginTimesMap.put(userName, currentLoginTimes - 1);
+ }
+ }
+
+ private static UserGroupInformation createRemoteUser(String userName) {
+ return UserGroupInformation.createRemoteUser(userName);
+ }
+
+ private static UserGroupInformation createKerberosUser() {
+ String krb5File =
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH);
+ String keytab =
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH);
+ String principal =
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME);
+ if (StringUtils.isNotBlank(krb5File)) {
+ System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File);
+ }
+
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed",
true);
+ hadoopConf.set(Constants.HADOOP_SECURITY_AUTHENTICATION,
Constants.KERBEROS);
+
+ try {
+ UserGroupInformation.setConfiguration(hadoopConf);
+ UserGroupInformation userGroupInformation =
+
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(),
keytab.trim());
+ UserGroupInformation.setLoginUser(userGroupInformation);
+ return userGroupInformation;
+ } catch (IOException e) {
+ throw new RuntimeException("createUserGroupInformation fail. ", e);
+ }
+ }
+
+ public static boolean openKerberos() {
+ String resUploadStartupType =
PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
+ ResUploadType resUploadType =
ResUploadType.valueOf(resUploadStartupType);
+ Boolean kerberosStartupState =
+
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false);
+ return resUploadType == ResUploadType.HDFS && kerberosStartupState;
+ }
+
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java
deleted file mode 100644
index be9ef1ea1f..0000000000
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.datasource.hive.utils;
-
-import static
org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.IOException;
-import java.util.Objects;
-
-import lombok.experimental.UtilityClass;
-
-@UtilityClass
-public class CommonUtil {
-
- public static boolean getKerberosStartupState() {
- String resUploadStartupType =
PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
- ResUploadType resUploadType =
ResUploadType.valueOf(resUploadStartupType);
- Boolean kerberosStartupState =
-
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,
false);
- return resUploadType == ResUploadType.HDFS && kerberosStartupState;
- }
-
- public static synchronized UserGroupInformation createUGI(Configuration
configuration, String principal,
- String keyTab,
String krb5File,
- String username)
throws IOException {
- if (getKerberosStartupState()) {
- Objects.requireNonNull(keyTab);
- if (StringUtils.isNotBlank(krb5File)) {
- System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File);
- }
- return loginKerberos(configuration, principal, keyTab);
- }
- return UserGroupInformation.createRemoteUser(username);
- }
-
- public static synchronized UserGroupInformation loginKerberos(final
Configuration config, final String principal,
- final String
keyTab) throws IOException {
- config.set(Constants.HADOOP_SECURITY_AUTHENTICATION,
Constants.KERBEROS);
- UserGroupInformation.setConfiguration(config);
- UserGroupInformation.loginUserFromKeytab(principal.trim(),
keyTab.trim());
- return UserGroupInformation.getCurrentUser();
- }
-
-}