caishunfeng commented on a change in pull request #7624:
URL: https://github.com/apache/dolphinscheduler/pull/7624#discussion_r775241949



##########
File path: 
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
##########
@@ -17,57 +17,153 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
 
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
-import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
-import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
-import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.exception.DataSourceException;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.utils.ClassLoaderUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.utils.ThreadContextClassLoader;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
 import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.JdbcConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import java.sql.Connection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.io.FilenameFilter;
+import java.net.MalformedURLException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
 public class DataSourceClientProvider {
     private static final Logger logger = 
LoggerFactory.getLogger(DataSourceClientProvider.class);
 
-    private static final Map<String, DataSourceClient> 
uniqueId2dataSourceClientMap = new ConcurrentHashMap<>();
+    private static final JdbcDriverManager jdbcDriverManagerInstance = 
JdbcDriverManager.getInstance();
 
-    private DataSourcePluginManager dataSourcePluginManager;
+    public static DataSourceClient createDataSourceClient(JdbcConnectionParam 
connectionParam) {
+        logger.info("Creating the createDataSourceClient. JdbcUrl: {} ", 
connectionParam.getJdbcUrl());
 
-    private DataSourceClientProvider() {
-        initDataSourcePlugin();
-    }
+        //Check jdbc driver location
+        checkDriverLocation(connectionParam);
+
+        logger.info("Creating the ClassLoader for the jdbc driver and 
plugin.");
+        ClassLoader driverClassLoader = getDriverClassLoader(connectionParam);
 
-    private static class DataSourceClientProviderHolder {
-        private static final DataSourceClientProvider INSTANCE = new 
DataSourceClientProvider();
+        try (ThreadContextClassLoader ignored = new 
ThreadContextClassLoader(driverClassLoader)) {
+            return createDataSourceClientWithClassLoader(connectionParam);
+        }
     }
 
-    public static DataSourceClientProvider getInstance() {
-        return DataSourceClientProviderHolder.INSTANCE;
+    protected static void checkDriverLocation(JdbcConnectionParam 
connectionParam) {
+        final String driverLocation = connectionParam.getDriverLocation();
+        if (StringUtils.isBlank(driverLocation)) {
+            logger.warn("No jdbc driver provide,will use randomly driver jar 
for {}.", connectionParam.getDbType().getDescp());
+            
connectionParam.setDriverLocation(jdbcDriverManagerInstance.getDefaultDriverPluginPath(connectionParam.getDbType().getDescp()));
+        }
     }
 
-    public Connection getConnection(DbType dbType, ConnectionParam 
connectionParam) {
-        BaseConnectionParam baseConnectionParam = (BaseConnectionParam) 
connectionParam;
-        String datasourceUniqueId = 
DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
-        logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
+    protected static ClassLoader getDriverClassLoader(JdbcConnectionParam 
connectionParam) {
+        FilenameFilter filenameFilter = (dir, name) -> name != null && 
name.endsWith(".jar");
+        ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
+        ClassLoader classLoader;
+
+        HashSet<String> paths = Sets.newHashSet();
+        if (StringUtils.isNotBlank(connectionParam.getDriverLocation())) {
+            logger.info("Driver location: {}", 
connectionParam.getDriverLocation());
+            paths.add(connectionParam.getDriverLocation());
+        }
+        try {
+            classLoader = ClassLoaderUtils.getCustomClassLoader(paths, 
threadClassLoader, filenameFilter);
+        } catch (final MalformedURLException e) {
+            throw DataSourceException.getInstance("Invalid jdbc driver 
location.", e);
+        }
+
+        //try loading jdbc driver
+        loadJdbcDriver(classLoader, connectionParam);
+
+        DbType dbType = connectionParam.getDbType();
+        String pluginPath = 
JdbcDriverManager.getInstance().getPluginPath(dbType);
+        logger.info("Plugin location: {}", pluginPath);
+        paths.add(pluginPath);
+
+        if (dbType == DbType.HIVE || dbType == DbType.SPARK) {
+            try {
+                Class.forName("org.apache.hadoop.conf.Configuration", true, 
classLoader);
+                
Class.forName("org.apache.hadoop.security.UserGroupInformation", true, 
classLoader);
+                Class.forName("org.apache.hadoop.fs.FileSystem", true, 
classLoader);
+            } catch (ClassNotFoundException cnf) {
+                cnf.printStackTrace();

Review comment:
       Is it better to add error log?

##########
File path: 
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/ThreadContextClassLoader.java
##########
@@ -15,33 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.utils;
+package org.apache.dolphinscheduler.plugin.datasource.api.utils;
 
-import org.junit.Assert;
-import org.junit.Test;
+import java.io.Closeable;
 
-/**
- * hive conf utils test
- */
-public class HiveConfUtilsTest {
-
-    /**
-     * test is hive conf var
-     */
-    @Test
-    public void testIsHiveConfVar() {
-
-        String conf = "hive.exec.script.wrapper=123";
-        boolean hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
-        Assert.assertTrue(hiveConfVar);
+public class ThreadContextClassLoader
+        implements Closeable {
+    private final ClassLoader threadContextClassLoader;
 
-        conf = "hive.test.v1=v1";
-        hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
-        Assert.assertFalse(hiveConfVar);
-
-        conf = "tez.queue.name=tezQueue";
-        hiveConfVar = HiveConfUtils.isHiveConfVar(conf);
-        Assert.assertTrue(hiveConfVar);
+    public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) {

Review comment:
       It seems that the class loader is thread level? 
   What's the matter if some datasource thread was created but not set 
classLoader?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to