This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ed1a28cb196042c5ba229821cd63cdf71633c22
Author: Kezhu Wang <[email protected]>
AuthorDate: Thu Dec 10 20:40:40 2020 +0800

    [FLINK-19435][connectors/jdbc] Add hang test case to reveal deadlock when 
loading different sql driver classes concurrently using Class.forName
    
    This closes #14361
---
 flink-connectors/flink-connector-jdbc/pom.xml      |  13 +
 .../flink/connector/jdbc/fakedb/FakeDBUtils.java   |  39 +++
 .../jdbc/fakedb/driver/FakeConnection.java         | 318 +++++++++++++++++++++
 .../jdbc/fakedb/driver/FakeConnection1.java        |  27 ++
 .../jdbc/fakedb/driver/FakeConnection2.java        |  27 ++
 .../connector/jdbc/fakedb/driver/FakeDriver1.java  |  82 ++++++
 .../connector/jdbc/fakedb/driver/FakeDriver2.java  |  82 ++++++
 ...onProviderDriverClassConcurrentLoadingTest.java | 102 +++++++
 .../resources/META-INF/services/java.sql.Driver    |  17 ++
 9 files changed, 707 insertions(+)

diff --git a/flink-connectors/flink-connector-jdbc/pom.xml 
b/flink-connectors/flink-connector-jdbc/pom.xml
index ce4bdeb..cea4c98 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -150,4 +150,17 @@ under the License.
                </dependency>
 
        </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <!-- Disable jvm process reuse to test 
driver class loading issues -->
+                                       <reuseForks>false</reuseForks>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
 </project>
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/FakeDBUtils.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/FakeDBUtils.java
new file mode 100644
index 0000000..25c0eae
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/FakeDBUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.fakedb;
+
+/**
+ * Utilities and constants for FakeDB.
+ */
+public class FakeDBUtils {
+       public static final String URL_PREFIX = "jdbc:fake:";
+
+       public static final String TEST_DB_URL = composeDBUrl("test");
+
+       public static final String DRIVER1_CLASS_NAME = 
"org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver1";
+       public static final String DRIVER2_CLASS_NAME = 
"org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver2";
+
+       public static String composeDBUrl(String db) {
+               return URL_PREFIX + db;
+       }
+
+       public static boolean acceptsUrl(String url) {
+               return url.startsWith(URL_PREFIX);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection.java
new file mode 100644
index 0000000..35a1753
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.fakedb.driver;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * A fake sql connection implementation which throws {@link SQLException} in 
most of its methods.
+ */
+public abstract class FakeConnection implements Connection {
+       private boolean closed = false;
+
+       @Override
+       public Statement createStatement() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public PreparedStatement prepareStatement(String sql) throws 
SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CallableStatement prepareCall(String sql) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String nativeSQL(String sql) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setAutoCommit(boolean autoCommit) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean getAutoCommit() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void commit() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void rollback() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void close() throws SQLException {
+               closed = true;
+       }
+
+       @Override
+       public boolean isClosed() throws SQLException {
+               return closed;
+       }
+
+       @Override
+       public DatabaseMetaData getMetaData() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setReadOnly(boolean readOnly) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean isReadOnly() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setCatalog(String catalog) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String getCatalog() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setTransactionIsolation(int level) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getTransactionIsolation() throws SQLException {
+               return TRANSACTION_NONE;
+       }
+
+       @Override
+       public SQLWarning getWarnings() throws SQLException {
+               return null;
+       }
+
+       @Override
+       public void clearWarnings() throws SQLException {
+       }
+
+       @Override
+       public Statement createStatement(int resultSetType, int 
resultSetConcurrency) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public PreparedStatement prepareStatement(String sql, int 
resultSetType, int resultSetConcurrency) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CallableStatement prepareCall(String sql, int resultSetType, int 
resultSetConcurrency) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Map<String, Class<?>> getTypeMap() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setHoldability(int holdability) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getHoldability() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Savepoint setSavepoint() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Savepoint setSavepoint(String name) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void rollback(Savepoint savepoint) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Statement createStatement(int resultSetType, int 
resultSetConcurrency, int resultSetHoldability) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public PreparedStatement prepareStatement(String sql, int 
resultSetType, int resultSetConcurrency, int resultSetHoldability) throws 
SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CallableStatement prepareCall(String sql, int resultSetType, int 
resultSetConcurrency, int resultSetHoldability) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public PreparedStatement prepareStatement(String sql, int 
autoGeneratedKeys) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public PreparedStatement prepareStatement(String sql, int[] 
columnIndexes) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public PreparedStatement prepareStatement(String sql, String[] 
columnNames) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Clob createClob() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Blob createBlob() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public NClob createNClob() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public SQLXML createSQLXML() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean isValid(int timeout) throws SQLException {
+               return !isClosed();
+       }
+
+       @Override
+       public void setClientInfo(String name, String value) throws 
SQLClientInfoException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setClientInfo(Properties properties) throws 
SQLClientInfoException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String getClientInfo(String name) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Properties getClientInfo() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Array createArrayOf(String typeName, Object[] elements) throws 
SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Struct createStruct(String typeName, Object[] attributes) throws 
SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setSchema(String schema) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String getSchema() throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void abort(Executor executor) throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void setNetworkTimeout(Executor executor, int milliseconds) 
throws SQLException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getNetworkTimeout() throws SQLException {
+               return 0;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public <T> T unwrap(Class<T> iface) throws SQLException {
+               if (iface.isInstance(this)) {
+                       return (T) this;
+               }
+               throw new SQLException(getClass() + " does not implement " + 
iface);
+       }
+
+       @Override
+       public boolean isWrapperFor(Class<?> iface) throws SQLException {
+               return iface.isInstance(this);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection1.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection1.java
new file mode 100644
index 0000000..56983d3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection1.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.fakedb.driver;
+
+import java.util.Properties;
+
+/**
+ * Sql connection created by {@link FakeDriver1#connect(String, Properties)}.
+ */
+public class FakeConnection1 extends FakeConnection {
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection2.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection2.java
new file mode 100644
index 0000000..3f95a51
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection2.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.fakedb.driver;
+
+import java.util.Properties;
+
+/**
+ * Sql connection created by {@link FakeDriver2#connect(String, Properties)}.
+ */
+public class FakeConnection2 extends FakeConnection {
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver1.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver1.java
new file mode 100644
index 0000000..01c1bfb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver1.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.fakedb.driver;
+
+import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+/**
+ * A {@link Driver} for FakeDB.
+ */
+public class FakeDriver1 implements Driver {
+
+       static {
+               try {
+                       DriverManager.registerDriver(new FakeDriver1());
+               } catch (SQLException ex) {
+                       throw new ExceptionInInitializerError(ex);
+               }
+       }
+
+       @Override
+       public Connection connect(String url, Properties info) throws 
SQLException {
+               if (!acceptsURL(url)) {
+                       return null;
+               }
+               return new FakeConnection1();
+       }
+
+       @Override
+       public boolean acceptsURL(String url) throws SQLException {
+               return FakeDBUtils.acceptsUrl(url);
+       }
+
+       @Override
+       public DriverPropertyInfo[] getPropertyInfo(String url, Properties 
info) throws SQLException {
+               return new DriverPropertyInfo[0];
+       }
+
+       @Override
+       public int getMajorVersion() {
+               return 0;
+       }
+
+       @Override
+       public int getMinorVersion() {
+               return 0;
+       }
+
+       @Override
+       public boolean jdbcCompliant() {
+               return false;
+       }
+
+       @Override
+       public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+               throw new SQLFeatureNotSupportedException();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver2.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver2.java
new file mode 100644
index 0000000..4d74957
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver2.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.fakedb.driver;
+
+import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+/**
+ * Another {@link Driver} for FakeDB.
+ */
+public class FakeDriver2 implements Driver {
+
+       static {
+               try {
+                       DriverManager.registerDriver(new FakeDriver2());
+               } catch (SQLException ex) {
+                       throw new ExceptionInInitializerError(ex);
+               }
+       }
+
+       @Override
+       public Connection connect(String url, Properties info) throws 
SQLException {
+               if (!acceptsURL(url)) {
+                       return null;
+               }
+               return new FakeConnection2();
+       }
+
+       @Override
+       public boolean acceptsURL(String url) throws SQLException {
+               return FakeDBUtils.acceptsUrl(url);
+       }
+
+       @Override
+       public DriverPropertyInfo[] getPropertyInfo(String url, Properties 
info) throws SQLException {
+               return new DriverPropertyInfo[0];
+       }
+
+       @Override
+       public int getMajorVersion() {
+               return 0;
+       }
+
+       @Override
+       public int getMinorVersion() {
+               return 0;
+       }
+
+       @Override
+       public boolean jdbcCompliant() {
+               return false;
+       }
+
+       @Override
+       public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+               throw new SQLFeatureNotSupportedException();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest.java
new file mode 100644
index 0000000..5321202
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.internal.connection;
+
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test deals with sql driver class loading issues, write it alone so it 
won't be
+ * interfered by other tests.
+ */
+public class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest {
+       private static boolean isClassLoaded(ClassLoader classLoader, String 
className) throws Exception {
+               do {
+                       Method m = 
ClassLoader.class.getDeclaredMethod("findLoadedClass", String.class);
+                       m.setAccessible(true);
+                       Object loadedClass = m.invoke(classLoader, className);
+                       if (loadedClass != null) {
+                               return true;
+                       }
+                       classLoader = classLoader.getParent();
+               } while (classLoader != null);
+               return false;
+       }
+
+       @Test(timeout = 5000)
+       public void testDriverClassConcurrentLoading() throws Exception {
+               ClassLoader classLoader = getClass().getClassLoader();
+
+               assertFalse(isClassLoaded(classLoader, 
FakeDBUtils.DRIVER1_CLASS_NAME));
+               assertFalse(isClassLoaded(classLoader, 
FakeDBUtils.DRIVER2_CLASS_NAME));
+
+               JdbcConnectionOptions connectionOptions1 = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                       .withUrl(FakeDBUtils.TEST_DB_URL)
+                       .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
+                       .build();
+
+               JdbcConnectionOptions connectionOptions2 = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                       .withUrl(FakeDBUtils.TEST_DB_URL)
+                       .withDriverName(FakeDBUtils.DRIVER2_CLASS_NAME)
+                       .build();
+
+               CountDownLatch startLatch = new CountDownLatch(1);
+
+               Function<JdbcConnectionOptions, CheckedThread> 
connectionThreadCreator = options -> {
+                       CheckedThread thread = new CheckedThread() {
+                               @Override
+                               public void go() throws Exception {
+                                       startLatch.await();
+                                       JdbcConnectionProvider 
connectionProvider = new SimpleJdbcConnectionProvider(options);
+                                       Connection connection = 
connectionProvider.getConnection();
+                                       connection.close();
+                               }
+                       };
+                       thread.setName("Loading " + options.getDriverName());
+                       thread.setDaemon(true);
+                       return thread;
+               };
+
+               CheckedThread connectionThread1 = 
connectionThreadCreator.apply(connectionOptions1);
+               CheckedThread connectionThread2 = 
connectionThreadCreator.apply(connectionOptions2);
+
+               connectionThread1.start();
+               connectionThread2.start();
+
+               Thread.sleep(2);
+               startLatch.countDown();
+
+               connectionThread1.sync();
+               connectionThread2.sync();
+
+               assertTrue(isClassLoaded(classLoader, 
FakeDBUtils.DRIVER1_CLASS_NAME));
+               assertTrue(isClassLoaded(classLoader, 
FakeDBUtils.DRIVER2_CLASS_NAME));
+       }
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/resources/META-INF/services/java.sql.Driver
 
b/flink-connectors/flink-connector-jdbc/src/test/resources/META-INF/services/java.sql.Driver
new file mode 100644
index 0000000..46ec4f1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/resources/META-INF/services/java.sql.Driver
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver1
+org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver2

Reply via email to