This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 77cf7e2 DRILL-7317: Close ClassLoaders used for udf jars uploading
when closing FunctionImplementationRegistry
77cf7e2 is described below
commit 77cf7e2ee61fb40e7efd85148ac76947d13dda38
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri Jul 5 16:13:54 2019 +0300
DRILL-7317: Close ClassLoaders used for udf jars uploading when closing
FunctionImplementationRegistry
- Fix issue with caching DrillMergeProjectRule and
FunctionImplementationRegistry when different drillbits are started within the
same JVM
---
.../expr/fn/FunctionImplementationRegistry.java | 3 +-
.../expr/fn/registry/FunctionRegistryHolder.java | 75 ++++++++++++++++++----
.../expr/fn/registry/LocalFunctionRegistry.java | 7 +-
.../planner/logical/DrillMergeProjectRule.java | 6 +-
4 files changed, 70 insertions(+), 21 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 30c194a..4210067 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -455,7 +455,7 @@ public class FunctionImplementationRegistry implements
FunctionLookupContext, Au
return RunTimeScan.dynamicPackageScan(drillConfig,
Sets.newHashSet(urls));
} finally {
if (markerFileConnection instanceof JarURLConnection) {
- ((JarURLConnection)
markerFile.openConnection()).getJarFile().close();
+ ((JarURLConnection) markerFileConnection).getJarFile().close();
}
}
}
@@ -592,6 +592,7 @@ public class FunctionImplementationRegistry implements
FunctionLookupContext, Au
*/
@Override
public void close() {
+ localFunctionRegistry.close();
if (deleteTmpDir) {
FileUtils.deleteQuietly(tmpDir);
} else {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index d0383e9..f378d45 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -29,6 +29,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -83,7 +85,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* <li><b>function holder for upper(VARCHAR-REQUIRED)</b> is {@link
DrillFuncHolder} initiated for each function.</li>
*
*/
-public class FunctionRegistryHolder {
+public class FunctionRegistryHolder implements AutoCloseable {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(FunctionRegistryHolder.class);
@@ -376,23 +378,13 @@ public class FunctionRegistryHolder {
return;
}
+ boolean isClosed = false;
for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) {
final String function = functionEntry.getKey();
Map<String, DrillFuncHolder> functionHolders = functions.get(function);
Queue<String> functionSignatures = functionEntry.getValue();
- for (Map.Entry<String, DrillFuncHolder> entry :
functionHolders.entrySet()) {
- if (functionSignatures.contains(entry.getKey())) {
- ClassLoader classLoader = entry.getValue().getClassLoader();
- if (classLoader instanceof AutoCloseable) {
- try {
- ((AutoCloseable) classLoader).close();
- } catch (Exception e) {
- logger.warn("Problem during closing class loader", e);
- }
- }
- break;
- }
- }
+ // closes class loader only one time
+ isClosed = !isClosed && closeClassLoader(function, functionSignatures);
functionHolders.keySet().removeAll(functionSignatures);
if (functionHolders.isEmpty()) {
@@ -400,4 +392,59 @@ public class FunctionRegistryHolder {
}
}
}
+
+ @Override
+ public void close() {
+ try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+ jars.forEach((jarName, jar) -> {
+ if (!LocalFunctionRegistry.BUILT_IN.equals(jarName)) {
+ for (Map.Entry<String, Queue<String>> functionEntry :
jar.entrySet()) {
+ if (closeClassLoader(functionEntry.getKey(),
functionEntry.getValue())) {
+ // class loader is closed, iterates to another jar
+ break;
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Produces search of {@link DrillFuncHolder} which corresponds to specified
{@code String functionName}
+ * with signature from {@code Queue<String> functionSignatures},
+ * closes its class loader if {@link DrillFuncHolder} is found and returns
true. Otherwise false is returned.
+ *
+ * @param functionName name of the function
+ * @param functionSignatures function signatures
+ * @return {@code true} if {@link DrillFuncHolder} was found and attempted
to close class loader disregarding the result
+ */
+ private boolean closeClassLoader(String functionName, Queue<String>
functionSignatures) {
+ return getDrillFuncHolder(functionName, functionSignatures)
+ .map(drillFuncHolder -> {
+ ClassLoader classLoader = drillFuncHolder.getClassLoader();
+ try {
+ ((AutoCloseable) classLoader).close();
+ } catch (Exception e) {
+ logger.warn("Problem during closing class loader", e);
+ }
+ return true;
+ })
+ .orElse(false);
+ }
+
+ /**
+ * Produces search of {@link DrillFuncHolder} which corresponds to specified
{@code String functionName}
+ * with signature from {@code Queue<String> functionSignatures} and returns
first found instance.
+ *
+ * @param functionName name of the function
+ * @param functionSignatures function signatures
+ * @return {@link Optional} with first found {@link DrillFuncHolder} instance
+ */
+ private Optional<DrillFuncHolder> getDrillFuncHolder(String functionName,
Queue<String> functionSignatures) {
+ Map<String, DrillFuncHolder> functionHolders = functions.get(functionName);
+ return functionSignatures.stream()
+ .map(functionHolders::get)
+ .filter(Objects::nonNull)
+ .findAny();
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index f96b1fb..cba900f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -53,7 +53,7 @@ import
org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference;
/**
* Registry of Drill functions.
*/
-public class LocalFunctionRegistry {
+public class LocalFunctionRegistry implements AutoCloseable {
public static final String BUILT_IN = "built-in";
@@ -356,4 +356,9 @@ public class LocalFunctionRegistry {
}
}
}
+
+ @Override
+ public void close() {
+ registryHolder.close();
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java
index f63759a..9801011 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java
@@ -47,15 +47,11 @@ import java.util.List;
public class DrillMergeProjectRule extends RelOptRule {
private FunctionImplementationRegistry functionRegistry;
- private static DrillMergeProjectRule INSTANCE = null;
private final boolean force;
public static DrillMergeProjectRule getInstance(boolean force,
ProjectFactory pFactory,
FunctionImplementationRegistry functionRegistry) {
- if (INSTANCE == null) {
- INSTANCE = new DrillMergeProjectRule(force, pFactory, functionRegistry);
- }
- return INSTANCE;
+ return new DrillMergeProjectRule(force, pFactory, functionRegistry);
}
private DrillMergeProjectRule(boolean force, ProjectFactory pFactory,