caishunfeng commented on a change in pull request #7624:
URL: https://github.com/apache/dolphinscheduler/pull/7624#discussion_r775241949
##########
File path:
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
##########
@@ -17,57 +17,153 @@
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
-import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
-import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
-import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import
org.apache.dolphinscheduler.plugin.datasource.api.exception.DataSourceException;
+import
org.apache.dolphinscheduler.plugin.datasource.api.utils.ClassLoaderUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.api.utils.ThreadContextClassLoader;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.JdbcConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import java.sql.Connection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.io.FilenameFilter;
+import java.net.MalformedURLException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
public class DataSourceClientProvider {
private static final Logger logger =
LoggerFactory.getLogger(DataSourceClientProvider.class);
- private static final Map<String, DataSourceClient>
uniqueId2dataSourceClientMap = new ConcurrentHashMap<>();
+ private static final JdbcDriverManager jdbcDriverManagerInstance =
JdbcDriverManager.getInstance();
- private DataSourcePluginManager dataSourcePluginManager;
+ public static DataSourceClient createDataSourceClient(JdbcConnectionParam
connectionParam) {
+ logger.info("Creating the createDataSourceClient. JdbcUrl: {} ",
connectionParam.getJdbcUrl());
- private DataSourceClientProvider() {
- initDataSourcePlugin();
- }
+ //Check jdbc driver location
+ checkDriverLocation(connectionParam);
+
+ logger.info("Creating the ClassLoader for the jdbc driver and
plugin.");
+ ClassLoader driverClassLoader = getDriverClassLoader(connectionParam);
- private static class DataSourceClientProviderHolder {
- private static final DataSourceClientProvider INSTANCE = new
DataSourceClientProvider();
+ try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(driverClassLoader)) {
+ return createDataSourceClientWithClassLoader(connectionParam);
+ }
}
- public static DataSourceClientProvider getInstance() {
- return DataSourceClientProviderHolder.INSTANCE;
+ protected static void checkDriverLocation(JdbcConnectionParam
connectionParam) {
+ final String driverLocation = connectionParam.getDriverLocation();
+ if (StringUtils.isBlank(driverLocation)) {
+ logger.warn("No jdbc driver provide,will use randomly driver jar
for {}.", connectionParam.getDbType().getDescp());
+
connectionParam.setDriverLocation(jdbcDriverManagerInstance.getDefaultDriverPluginPath(connectionParam.getDbType().getDescp()));
+ }
}
- public Connection getConnection(DbType dbType, ConnectionParam
connectionParam) {
- BaseConnectionParam baseConnectionParam = (BaseConnectionParam)
connectionParam;
- String datasourceUniqueId =
DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
- logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
+ protected static ClassLoader getDriverClassLoader(JdbcConnectionParam
connectionParam) {
+ FilenameFilter filenameFilter = (dir, name) -> name != null &&
name.endsWith(".jar");
+ ClassLoader threadClassLoader =
Thread.currentThread().getContextClassLoader();
+ ClassLoader classLoader;
+
+ HashSet<String> paths = Sets.newHashSet();
+ if (StringUtils.isNotBlank(connectionParam.getDriverLocation())) {
+ logger.info("Driver location: {}",
connectionParam.getDriverLocation());
+ paths.add(connectionParam.getDriverLocation());
+ }
+ try {
+ classLoader = ClassLoaderUtils.getCustomClassLoader(paths,
threadClassLoader, filenameFilter);
+ } catch (final MalformedURLException e) {
+ throw DataSourceException.getInstance("Invalid jdbc driver
location.", e);
+ }
+
+ //try loading jdbc driver
+ loadJdbcDriver(classLoader, connectionParam);
+
+ DbType dbType = connectionParam.getDbType();
+ String pluginPath =
JdbcDriverManager.getInstance().getPluginPath(dbType);
+ logger.info("Plugin location: {}", pluginPath);
+ paths.add(pluginPath);
+
+ if (dbType == DbType.HIVE || dbType == DbType.SPARK) {
+ try {
+ Class.forName("org.apache.hadoop.conf.Configuration", true,
classLoader);
+
Class.forName("org.apache.hadoop.security.UserGroupInformation", true,
classLoader);
+ Class.forName("org.apache.hadoop.fs.FileSystem", true,
classLoader);
+ } catch (ClassNotFoundException cnf) {
+ cnf.printStackTrace();
Review comment:
Is it better to add error log?
##########
File path:
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/ThreadContextClassLoader.java
##########
@@ -15,33 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.utils;
+package org.apache.dolphinscheduler.plugin.datasource.api.utils;
-import org.junit.Assert;
-import org.junit.Test;
+import java.io.Closeable;
-/**
- * hive conf utils test
- */
-public class HiveConfUtilsTest {
-
- /**
- * test is hive conf var
- */
- @Test
- public void testIsHiveConfVar() {
-
- String conf = "hive.exec.script.wrapper=123";
- boolean hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
- Assert.assertTrue(hiveConfVar);
+public class ThreadContextClassLoader
+ implements Closeable {
+ private final ClassLoader threadContextClassLoader;
- conf = "hive.test.v1=v1";
- hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
- Assert.assertFalse(hiveConfVar);
-
- conf = "tez.queue.name=tezQueue";
- hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
- Assert.assertTrue(hiveConfVar);
+ public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) {
Review comment:
It seems that the class loader is thread level?
What's the matter if some datasource thread was created but not set
classLoader?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]