[
https://issues.apache.org/jira/browse/DRILL-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641787#comment-16641787
]
ASF GitHub Bot commented on DRILL-6762:
---------------------------------------
asfgit closed pull request #1484: DRILL-6762: Fix dynamic UDFs versioning issue
URL: https://github.com/apache/drill/pull/1484
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
index 2a331bb9107..d2dc0f8a750 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
@@ -181,4 +181,17 @@ static UserException findWrappedUserException(Throwable
ex) {
return (UserException) cause;
}
+ /**
+ * Helps to hide checked exception from the compiler but then actually throw
it.
+ * Is useful when implementing functional interfaces that allow checked
exceptions.
+ *
+ * @param e original exception instance
+ * @param <E> exception type
+ * @throws E exception instance
+ */
+ @SuppressWarnings("unchecked")
+ public static <E extends Throwable> void sneakyThrow(Throwable e) throws E {
+ throw (E) e;
+ }
+
}
diff --git
a/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java
b/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java
new file mode 100644
index 00000000000..050565e7a59
--- /dev/null
+++
b/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util.function;
+
+import java.util.function.Function;
+
+import static org.apache.drill.common.exceptions.ErrorHelper.sneakyThrow;
+
+/**
+ * Extension of {@link Function} that allows to throw checked exception.
+ *
+ * @param <T> function argument type
+ * @param <R> function result type
+ * @param <E> exception type
+ */
+@FunctionalInterface
+public interface CheckedFunction<T, R, E extends Throwable> extends
Function<T, R> {
+
+ /**
+ * Overrides {@link Function#apply(Object)} method to allow calling
functions that throw checked exceptions.
+ * Is useful when used in methods that accept {@link Function}.
+ * For example: {@link java.util.Map#computeIfAbsent(Object, Function)}.
+ *
+ * @param t the function argument
+ * @return the function result
+ */
+ @Override
+ default R apply(T t) {
+ try {
+ return applyAndThrow(t);
+ } catch (Throwable e) {
+ sneakyThrow(e);
+ }
+ // should never happen
+ throw new RuntimeException();
+ }
+
+ /**
+ * Applies function to the given argument.
+ *
+ * @param t the function argument
+ * @return the function result
+ * @throws E exception in case of errors
+ */
+ R applyAndThrow(T t) throws E;
+
+}
+
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
b/common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java
similarity index 92%
rename from
exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
rename to
common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java
index b744ac8c033..60633839945 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
+++
b/common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java
@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.util;
+package org.apache.drill.common.util.function;
/**
- * The java standard library does not provide a lambda function interface for
funtions that take no arguments,
+ * The java standard library does not provide a lambda function interface for
functions that take no arguments,
* but that throw an exception. So, we have to define our own here.
* @param <T> The return type of the lambda function.
* @param <E> The type of exception thrown by the lambda function.
diff --git
a/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java
b/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java
new file mode 100644
index 00000000000..a1ab389836e
--- /dev/null
+++
b/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util.function;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestCheckedFunction {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testComputeIfAbsentWithCheckedFunction() {
+ ExceptionProducer producer = new ExceptionProducer();
+ Map<String, String> map = new HashMap<>();
+ String message = "Exception message";
+ CheckedFunction<String, String, Exception> function =
producer::failWithMessage;
+
+ thrown.expect(Exception.class);
+ thrown.expectMessage(message);
+
+ map.computeIfAbsent(message, function);
+ }
+
+ private class ExceptionProducer {
+
+ String failWithMessage(String message) throws Exception {
+ throw new Exception(message);
+ }
+
+ }
+
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
index 135ccd4e1bb..35feaa9f282 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
@@ -25,7 +25,7 @@
/**
* Returns a {@link TransientStore transient store} instance for the given
configuration.
*
- * Note that implementors have liberty to cache previous {@link
PersistentStore store} instances.
+ * Note that implementors have liberty to cache previous {@link
TransientStore store} instances.
*
* @param config store configuration
* @param <V> store value type
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index f24f9aaef58..f4b83736739 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -25,11 +25,12 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLConnection;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -64,7 +65,6 @@
import
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.JarUtil;
import org.apache.hadoop.fs.FileSystem;
@@ -83,7 +83,7 @@
private final Path localUdfDir;
private boolean deleteTmpDir = false;
private File tmpDir;
- private List<PluggableFunctionRegistry> pluggableFuncRegistries =
Lists.newArrayList();
+ private List<PluggableFunctionRegistry> pluggableFuncRegistries = new
ArrayList<>();
private OptionSet optionManager;
private final boolean useDynamicUdfs;
@@ -168,7 +168,7 @@ public void register(DrillOperatorTable operatorTable) {
*/
@Override
public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver,
FunctionCall functionCall) {
- AtomicLong version = new AtomicLong();
+ AtomicInteger version = new AtomicInteger();
String newFunctionName = functionReplacement(functionCall);
// Dynamic UDFS: First try with exact match. If not found, we may need to
@@ -246,7 +246,7 @@ private DrillFuncHolder
findExactMatchingDrillFunction(String name,
List<MajorType>
argTypes,
MajorType returnType,
boolean retry) {
- AtomicLong version = new AtomicLong();
+ AtomicInteger version = new AtomicInteger();
for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) {
if (h.matches(returnType, argTypes)) {
return h;
@@ -321,19 +321,19 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry()
{
/**
* Purpose of this method is to synchronize remote and local function
registries if needed
* and to inform if function registry was changed after given version.
- *
+ * <p/>
* To make synchronization as much light-weigh as possible, first only
versions of both registries are checked
* without any locking. If synchronization is needed, enters synchronized
block to prevent others loading the same jars.
* The need of synchronization is checked again (double-check lock) before
comparing jars.
* If any missing jars are found, they are downloaded to local udf area,
each is wrapped into {@link JarScan}.
* Once jar download is finished, all missing jars are registered in one
batch.
* In case if any errors during jars download / registration, these errors
are logged.
- *
+ * <p/>
* During registration local function registry is updated with remote
function registry version it is synced with.
* When at least one jar of the missing jars failed to download / register,
* local function registry version are not updated but jars that where
successfully downloaded / registered
* are added to local function registry.
- *
+ * <p/>
* If synchronization between remote and local function registry was not
needed,
* checks if given registry version matches latest sync version
* to inform if function registry was changed after given version.
@@ -342,16 +342,16 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry()
{
* @return true if remote and local function registries were synchronized
after given version
*/
@SuppressWarnings("resource")
- public boolean syncWithRemoteRegistry(long version) {
+ public boolean syncWithRemoteRegistry(int version) {
// Do the version check only if a remote registry exists. It does
// not exist for some JMockit-based unit tests.
if (isRegistrySyncNeeded()) {
synchronized (this) {
- long localRegistryVersion = localFunctionRegistry.getVersion();
+ int localRegistryVersion = localFunctionRegistry.getVersion();
if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(),
localRegistryVersion)) {
DataChangeVersion remoteVersion = new DataChangeVersion();
List<String> missingJars =
getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry,
remoteVersion);
- List<JarScan> jars = Lists.newArrayList();
+ List<JarScan> jars = new ArrayList<>();
if (!missingJars.isEmpty()) {
logger.info("Starting dynamic UDFs lazy-init process.\n" +
"The following jars are going to be downloaded and registered
locally: " + missingJars);
@@ -381,7 +381,7 @@ public boolean syncWithRemoteRegistry(long version) {
}
}
}
- long latestRegistryVersion = jars.size() != missingJars.size() ?
+ int latestRegistryVersion = jars.size() != missingJars.size() ?
localRegistryVersion : remoteVersion.getVersion();
localFunctionRegistry.register(jars, latestRegistryVersion);
return true;
@@ -392,23 +392,38 @@ public boolean syncWithRemoteRegistry(long version) {
return version != localFunctionRegistry.getVersion();
}
+ /**
+ * Checks if remote and local registries should be synchronized.
+ * Before comparing versions, checks if remote function registry is actually
exists.
+ *
+ * @return true is local registry should be refreshed, false otherwise
+ */
private boolean isRegistrySyncNeeded() {
+ logger.trace("Has remote function registry: {}",
remoteFunctionRegistry.hasRegistry());
return remoteFunctionRegistry.hasRegistry() &&
isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(),
localFunctionRegistry.getVersion());
}
/**
* Checks if local function registry should be synchronized with remote
function registry.
- * If remote function registry version is -1, it means that remote function
registry is unreachable
- * or is not configured thus we skip synchronization and return false.
- * In all other cases synchronization is needed if remote and local function
registries versions do not match.
+ *
+ * <ul>If remote function registry version is {@link
DataChangeVersion#UNDEFINED},
+ * it means that remote function registry does not support versioning
+ * thus we need to synchronize both registries.</ul>
+ * <ul>If remote function registry version is {@link
DataChangeVersion#NOT_AVAILABLE},
+ * it means that remote function registry is unreachable
+ * or is not configured thus we skip synchronization and return false.</ul>
+ * <ul>For all other cases synchronization is needed if remote
+ * and local function registries versions do not match.</ul>
*
* @param remoteVersion remote function registry version
* @param localVersion local function registry version
* @return true is local registry should be refreshed, false otherwise
*/
- private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) {
- return remoteVersion != -1 && remoteVersion != localVersion;
+ private boolean isRegistrySyncNeeded(int remoteVersion, int localVersion) {
+ logger.trace("Compare remote [{}] and local [{}] registry versions.",
remoteVersion, localVersion);
+ return remoteVersion == DataChangeVersion.UNDEFINED ||
+ (remoteVersion != DataChangeVersion.NOT_AVAILABLE && remoteVersion !=
localVersion);
}
/**
@@ -459,7 +474,7 @@ private ScanResult scan(ClassLoader classLoader, Path path,
URL[] urls) throws I
DataChangeVersion version) {
List<Jar> remoteJars =
remoteFunctionRegistry.getRegistry(version).getJarList();
List<String> localJars = localFunctionRegistry.getAllJarNames();
- List<String> missingJars = Lists.newArrayList();
+ List<String> missingJars = new ArrayList<>();
for (Jar jar : remoteJars) {
if (!localJars.contains(jar.getName())) {
missingJars.add(jar.getName());
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index dc8fd74ffbc..d1d4fc94dfd 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -19,18 +19,18 @@
import
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
import org.apache.drill.common.AutoCloseables.Closeable;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -45,9 +45,10 @@
* Holder is designed to allow concurrent reads and single writes to keep data
consistent.
* This is achieved by {@link ReadWriteLock} implementation usage.
* Holder has number version which indicates remote function registry version
number it is in sync with.
- *
+ * <p/>
* Structure example:
*
+ * <pre>
* JARS
* built-in -> upper -> upper(VARCHAR-REQUIRED)
* -> lower -> lower(VARCHAR-REQUIRED)
@@ -72,12 +73,12 @@
*
* custom_lower -> custom_lower(VARCHAR-REQUIRED) -> function holder for
custom_lower(VARCHAR-REQUIRED)
* -> custom_lower(VARCHAR-OPTIONAL) -> function holder for
custom_lower(VARCHAR-OPTIONAL)
- *
+ * </pre>
* where
- * First.jar is jar name represented by String
- * upper is function name represented by String
- * upper(VARCHAR-REQUIRED) is signature name represented by String which
consist of function name, list of input parameters
- * function holder for upper(VARCHAR-REQUIRED) is {@link DrillFuncHolder}
initiated for each function.
+ * <li><b>First.jar</b> is jar name represented by {@link String}.</li>
+ * <li><b>upper</b> is function name represented by {@link String}.</li>
+ * <li><b>upper(VARCHAR-REQUIRED)</b> is signature name represented by String
which consist of function name, list of input parameters.</li>
+ * <li><b>function holder for upper(VARCHAR-REQUIRED)</b> is {@link
DrillFuncHolder} initiated for each function.</li>
*
*/
public class FunctionRegistryHolder {
@@ -88,7 +89,7 @@
private final AutoCloseableLock readLock = new
AutoCloseableLock(readWriteLock.readLock());
private final AutoCloseableLock writeLock = new
AutoCloseableLock(readWriteLock.writeLock());
// remote function registry number, it is in sync with
- private long version;
+ private int version;
// jar name, Map<function name, Queue<function signature>
private final Map<String, Map<String, Queue<String>>> jars;
@@ -97,15 +98,15 @@
private final Map<String, Map<String, DrillFuncHolder>> functions;
public FunctionRegistryHolder() {
- this.functions = Maps.newConcurrentMap();
- this.jars = Maps.newConcurrentMap();
+ this.functions = new ConcurrentHashMap<>();
+ this.jars = new ConcurrentHashMap<>();
}
/**
* This is read operation, so several users at a time can get this data.
* @return local function registry version number
*/
- public long getVersion() {
+ public int getVersion() {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
return version;
}
@@ -122,12 +123,12 @@ public long getVersion() {
*
* @param newJars jars and list of their function holders, each contains
function name, signature and holder
*/
- public void addJars(Map<String, List<FunctionHolder>> newJars, long version)
{
+ public void addJars(Map<String, List<FunctionHolder>> newJars, int version) {
try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
for (Map.Entry<String, List<FunctionHolder>> newJar :
newJars.entrySet()) {
String jarName = newJar.getKey();
removeAllByJar(jarName);
- Map<String, Queue<String>> jar = Maps.newConcurrentMap();
+ Map<String, Queue<String>> jar = new ConcurrentHashMap<>();
jars.put(jarName, jar);
addFunctions(jar, newJar.getValue());
}
@@ -156,7 +157,7 @@ public void removeJar(String jarName) {
*/
public List<String> getAllJarNames() {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
- return Lists.newArrayList(jars.keySet());
+ return new ArrayList<>(jars.keySet());
}
}
@@ -171,7 +172,7 @@ public void removeJar(String jarName) {
public List<String> getFunctionNamesByJar(String jarName) {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()){
Map<String, Queue<String>> functions = jars.get(jarName);
- return functions == null ? Lists.<String>newArrayList() :
Lists.newArrayList(functions.keySet());
+ return functions == null ? new ArrayList<>() : new
ArrayList<>(functions.keySet());
}
}
@@ -185,14 +186,14 @@ public void removeJar(String jarName) {
* @param version version holder
* @return all functions which their holders
*/
- public ListMultimap<String, DrillFuncHolder>
getAllFunctionsWithHolders(AtomicLong version) {
+ public ListMultimap<String, DrillFuncHolder>
getAllFunctionsWithHolders(AtomicInteger version) {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
if (version != null) {
version.set(this.version);
}
ListMultimap<String, DrillFuncHolder> functionsWithHolders =
ArrayListMultimap.create();
for (Map.Entry<String, Map<String, DrillFuncHolder>> function :
functions.entrySet()) {
- functionsWithHolders.putAll(function.getKey(),
Lists.newArrayList(function.getValue().values()));
+ functionsWithHolders.putAll(function.getKey(), new
ArrayList<>(function.getValue().values()));
}
return functionsWithHolders;
}
@@ -220,7 +221,7 @@ public void removeJar(String jarName) {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
ListMultimap<String, String> functionsWithSignatures =
ArrayListMultimap.create();
for (Map.Entry<String, Map<String, DrillFuncHolder>> function :
functions.entrySet()) {
- functionsWithSignatures.putAll(function.getKey(),
Lists.newArrayList(function.getValue().keySet()));
+ functionsWithSignatures.putAll(function.getKey(), new
ArrayList<>(function.getValue().keySet()));
}
return functionsWithSignatures;
}
@@ -236,13 +237,13 @@ public void removeJar(String jarName) {
* @param version version holder
* @return list of function holders
*/
- public List<DrillFuncHolder> getHoldersByFunctionName(String functionName,
AtomicLong version) {
+ public List<DrillFuncHolder> getHoldersByFunctionName(String functionName,
AtomicInteger version) {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
if (version != null) {
version.set(this.version);
}
Map<String, DrillFuncHolder> holders = functions.get(functionName);
- return holders == null ? Lists.<DrillFuncHolder>newArrayList() :
Lists.newArrayList(holders.values());
+ return holders == null ? new ArrayList<>() : new
ArrayList<>(holders.values());
}
}
@@ -316,17 +317,13 @@ private void addFunctions(Map<String, Queue<String>> jar,
List<FunctionHolder> n
final String functionName = function.getName();
Queue<String> jarFunctions = jar.get(functionName);
if (jarFunctions == null) {
- jarFunctions = Queues.newConcurrentLinkedQueue();
+ jarFunctions = new ConcurrentLinkedQueue<>();
jar.put(functionName, jarFunctions);
}
final String functionSignature = function.getSignature();
jarFunctions.add(functionSignature);
- Map<String, DrillFuncHolder> signatures = functions.get(functionName);
- if (signatures == null) {
- signatures = Maps.newConcurrentMap();
- functions.put(functionName, signatures);
- }
+ Map<String, DrillFuncHolder> signatures =
functions.computeIfAbsent(functionName, k -> new ConcurrentHashMap<>());
signatures.put(functionSignature, function.getHolder());
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index 3740a6cc85d..cefbd8cf388 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -24,8 +24,9 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -77,13 +78,16 @@
private final FunctionRegistryHolder registryHolder;
/**
- * Registers all functions present in Drill classpath on start-up. All
functions will be marked as built-in.
- * Built-in functions are not allowed to be unregistered. Initially sync
registry version will be set to 0.
+ * Registers all functions present in Drill classpath on start-up.
+ * All functions will be marked as built-in. Built-in functions are not
allowed to be unregistered.
+ * Since local function registry version is based on remote function
registry version,
+ * initially sync version will be set to {@link DataChangeVersion#UNDEFINED}
+ * to ensure that upon first check both registries would be synchronized.
*/
public LocalFunctionRegistry(ScanResult classpathScan) {
registryHolder = new FunctionRegistryHolder();
validate(BUILT_IN, classpathScan);
- register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan,
this.getClass().getClassLoader())), 0);
+ register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan,
this.getClass().getClassLoader())), DataChangeVersion.UNDEFINED);
if (logger.isTraceEnabled()) {
StringBuilder allFunctions = new StringBuilder();
for (DrillFuncHolder method:
registryHolder.getAllFunctionsWithHolders().values()) {
@@ -96,7 +100,7 @@ public LocalFunctionRegistry(ScanResult classpathScan) {
/**
* @return remote function registry version number with which local function
registry is synced
*/
- public long getVersion() {
+ public int getVersion() {
return registryHolder.getVersion();
}
@@ -160,7 +164,7 @@ public long getVersion() {
* @param jars list of jars to be registered
* @param version remote function registry version number with which local
function registry is synced
*/
- public void register(List<JarScan> jars, long version) {
+ public void register(List<JarScan> jars, int version) {
Map<String, List<FunctionHolder>> newJars = new HashMap<>();
for (JarScan jarScan : jars) {
FunctionConverter converter = new FunctionConverter();
@@ -219,7 +223,7 @@ public int size(){
* @param name function name
* @return all function holders associated with the function name. Function
name is case insensitive.
*/
- public List<DrillFuncHolder> getMethods(String name, AtomicLong version) {
+ public List<DrillFuncHolder> getMethods(String name, AtomicInteger version) {
return registryHolder.getHoldersByFunctionName(name.toLowerCase(),
version);
}
@@ -238,7 +242,7 @@ public int size(){
* @param operatorTable drill operator table
*/
public void register(DrillOperatorTable operatorTable) {
- AtomicLong versionHolder = new AtomicLong();
+ AtomicInteger versionHolder = new AtomicInteger();
final Map<String, Collection<DrillFuncHolder>> registeredFunctions =
registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
operatorTable.setFunctionRegistryVersion(versionHolder.get());
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index 4e947656f3a..f727a9374eb 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -54,34 +54,36 @@
* Creates all remote registry areas at startup and validates them,
* during init establishes connections with three udf related stores.
* Provides tools to work with three udf related stores, gives access to
remote registry areas.
- *
+ * <p/>
* There are three udf stores:
- * REGISTRY - persistent store, stores remote function registry {@link
Registry} under udf path
+ *
+ * <li><b>REGISTRY</b> - persistent store, stores remote function registry
{@link Registry} under udf path
* which contains information about all dynamically registered jars and their
function signatures.
- * If connection is created for the first time, puts empty remote registry.
+ * If connection is created for the first time, puts empty remote
registry.</li>
*
- * UNREGISTRATION - transient store, stores information under udf/unregister
path.
+ * <li><b>UNREGISTRATION</b> - transient store, stores information under
udf/unregister path.
* udf/unregister path is persistent by itself but any child created will be
transient.
* Whenever user submits request to unregister jar, child path with jar name
is created under this store.
* This store also holds unregistration listener, which notifies all drill
bits when child path is created,
- * so they can start local unregistration process.
+ * so they can start local unregistration process.</li>
*
- * JARS - transient store, stores information under udf/jars path.
+ * <li><b>JARS</b> - transient store, stores information under udf/jars path.
* udf/jars path is persistent by itself but any child created will be
transient.
* Servers as lock, not allowing to perform any action on the same time.
* There two types of actions: {@link Action#REGISTRATION} and {@link
Action#UNREGISTRATION}.
* Before starting any action, users tries to create child path with jar name
under this store
* and if such path already exists, receives action being performed on that
very jar.
- * When user finishes its action, he deletes child path with jar name.
- *
+ * When user finishes its action, he deletes child path with jar name.</li>
+ * <p/>
* There are three udf areas:
- * STAGING - area where user copies binary and source jars before starting
registration process.
- * REGISTRY - area where registered jars are stored.
- * TMP - area where source and binary jars are backed up in unique folder
during registration process.
+ *
+ * <li><b>STAGING</b> - area where user copies binary and source jars before
starting registration process.</li>
+ * <li><b>REGISTRY</b> - area where registered jars are stored.</li>
+ * <li><b>TMP</b> - area where source and binary jars are backed up in unique
folder during registration process.</li>
*/
public class RemoteFunctionRegistry implements AutoCloseable {
- private static final String registry_path = "registry";
+ private static final String REGISTRY_PATH = "registry";
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(RemoteFunctionRegistry.class);
private static final ObjectMapper mapper = new
ObjectMapper().enable(INDENT_OUTPUT);
@@ -112,19 +114,19 @@ public void init(DrillConfig config,
PersistentStoreProvider storeProvider, Clus
*
* @return remote function registry version if any, -1 otherwise
*/
- public long getRegistryVersion() {
+ public int getRegistryVersion() {
DataChangeVersion version = new DataChangeVersion();
boolean contains = false;
try {
- contains = registry.contains(registry_path, version);
+ contains = registry.contains(REGISTRY_PATH, version);
} catch (Exception e) {
- logger.error("Problem during trying to access remote function registry
[{}]", registry_path, e);
+ logger.error("Problem during trying to access remote function registry
[{}]", REGISTRY_PATH, e);
}
if (contains) {
return version.getVersion();
} else {
- logger.error("Remote function registry [{}] is unreachable",
registry_path);
- return -1;
+ logger.error("Remote function registry [{}] is unreachable",
REGISTRY_PATH);
+ return DataChangeVersion.NOT_AVAILABLE;
}
}
@@ -137,11 +139,11 @@ public long getRegistryVersion() {
public boolean hasRegistry() { return registry != null; }
public Registry getRegistry(DataChangeVersion version) {
- return registry.get(registry_path, version);
+ return registry.get(REGISTRY_PATH, version);
}
public void updateRegistry(Registry registryContent, DataChangeVersion
version) throws VersionMismatchException {
- registry.put(registry_path, registryContent, version);
+ registry.put(REGISTRY_PATH, registryContent, version);
}
public void submitForUnregistration(String jar) {
@@ -193,7 +195,8 @@ private void prepareStores(PersistentStoreProvider
storeProvider, ClusterCoordin
.persist()
.build();
registry = storeProvider.getOrCreateVersionedStore(registrationConfig);
- registry.putIfAbsent(registry_path, Registry.getDefaultInstance());
+ logger.trace("Remote function registry type: {}.", registry.getClass());
+ registry.putIfAbsent(REGISTRY_PATH, Registry.getDefaultInstance());
} catch (StoreException e) {
throw new DrillRuntimeException("Failure while loading remote
registry.", e);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index e1e33098aae..eb79a5a65af 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -54,7 +54,7 @@
private final ArrayListMultimap<String, SqlOperator>
drillOperatorsWithInferenceMap = ArrayListMultimap.create();
// indicates remote function registry version based on which drill operator
were loaded
// is used to define if we need to reload operator table in case remote
function registry version has changed
- private long functionRegistryVersion;
+ private int functionRegistryVersion;
private final OptionManager systemOptionManager;
@@ -70,14 +70,14 @@ public DrillOperatorTable(FunctionImplementationRegistry
registry, OptionManager
*
* @param version registry version
*/
- public void setFunctionRegistryVersion(long version) {
+ public void setFunctionRegistryVersion(int version) {
functionRegistryVersion = version;
}
/**
* @return function registry version based on which operator table was loaded
*/
- public long getFunctionRegistryVersion() {
+ public int getFunctionRegistryVersion() {
return functionRegistryVersion;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index e3cd7e460db..41faea96c13 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -21,7 +21,6 @@
import org.apache.calcite.sql.SqlDescribeSchema;
import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
@@ -58,8 +57,7 @@ private DrillSqlWorker() {
* @param sql sql query
* @return query physical plan
*/
- public static PhysicalPlan getPlan(QueryContext context, String sql) throws
SqlParseException, ValidationException,
- ForemanSetupException {
+ public static PhysicalPlan getPlan(QueryContext context, String sql) throws
ForemanSetupException {
return getPlan(context, sql, null);
}
@@ -76,15 +74,18 @@ public static PhysicalPlan getPlan(QueryContext context,
String sql) throws SqlP
* @param textPlan text plan
* @return query physical plan
*/
- public static PhysicalPlan getPlan(QueryContext context, String sql,
Pointer<String> textPlan)
- throws ForemanSetupException {
+ public static PhysicalPlan getPlan(QueryContext context, String sql,
Pointer<String> textPlan) throws ForemanSetupException {
Pointer<String> textPlanCopy = textPlan == null ? null : new
Pointer<>(textPlan.value);
try {
return getQueryPlan(context, sql, textPlan);
} catch (Exception e) {
+ logger.trace("There was an error during conversion into physical plan. "
+
+ "Will sync remote and local function registries if needed and retry
" +
+ "in case if issue was due to missing function implementation.");
if (context.getFunctionRegistry().syncWithRemoteRegistry(
context.getDrillOperatorTable().getFunctionRegistryVersion())) {
context.reloadDrillOperatorTable();
+ logger.trace("Local function registry was synchronized with remote.
Trying to find function one more time.");
return getQueryPlan(context, sql, textPlanCopy);
}
throw e;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
index 4311f48bd45..ca26a243326 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
@@ -18,24 +18,37 @@
package org.apache.drill.exec.store.sys;
import org.apache.drill.exec.exception.StoreException;
-import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
+import org.apache.drill.exec.store.sys.store.UndefinedVersionDelegatingStore;
/**
* A factory used to create {@link PersistentStore store} instances.
- *
*/
public interface PersistentStoreProvider extends AutoCloseable {
+
/**
* Gets or creates a {@link PersistentStore persistent store} for the given
configuration.
*
* Note that implementors have liberty to cache previous {@link
PersistentStore store} instances.
*
- * @param config store configuration
- * @param <V> store value type
+ * @param config store configuration
+ * @param <V> store value type
+ * @return persistent store instance
+ * @throws StoreException in case when unable to create store
*/
<V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config)
throws StoreException;
+
+ /**
+ * Override this method if store supports versioning and return versioning
instance.
+ * By default, undefined version wrapper will be used.
+ *
+ * @param config store configuration
+ * @param <V> store value type
+ * @return versioned persistent store instance
+ * @throws StoreException in case when unable to create store
+ */
default <V> VersionedPersistentStore<V>
getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws
StoreException {
- return new VersionedDelegatingStore<>(getOrCreateStore(config));
+ // for those stores that do not support versioning
+ return new UndefinedVersionDelegatingStore<>(getOrCreateStore(config));
}
/**
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
index d182de33174..76e5610def4 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
@@ -17,9 +17,17 @@
*/
package org.apache.drill.exec.store.sys.store;
+/**
+ * Holder for store version. By default version is {@link
DataChangeVersion#UNDEFINED}.
+ */
public class DataChangeVersion {
- private int version;
+ // is used when store in unreachable
+ public static final int NOT_AVAILABLE = -1;
+ // is used when store does not support versioning
+ public static final int UNDEFINED = -2;
+
+ private int version = UNDEFINED;
public void setVersion(int version) {
this.version = version;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
new file mode 100644
index 00000000000..5873ec0ac92
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Wrapper store that delegates operations to PersistentStore.
+ * Does not keep versioning and returns {@link DataChangeVersion#UNDEFINED}
when version is required.
+ *
+ * @param <V> store value type
+ */
+public class UndefinedVersionDelegatingStore<V> implements
VersionedPersistentStore<V> {
+
+ private final PersistentStore<V> store;
+
+ public UndefinedVersionDelegatingStore(PersistentStore<V> store) {
+ this.store = store;
+ }
+
+ @Override
+ public boolean contains(String key, DataChangeVersion version) {
+ version.setVersion(DataChangeVersion.UNDEFINED);
+ return store.contains(key);
+ }
+
+ @Override
+ public V get(String key, DataChangeVersion version) {
+ version.setVersion(DataChangeVersion.UNDEFINED);
+ return store.get(key);
+ }
+
+ @Override
+ public void put(String key, V value, DataChangeVersion version) {
+ store.put(key, value);
+ }
+
+ @Override
+ public PersistentStoreMode getMode() {
+ return store.getMode();
+ }
+
+ @Override
+ public void delete(String key) {
+ store.delete(key);
+ }
+
+ @Override
+ public boolean putIfAbsent(String key, V value) {
+ return store.putIfAbsent(key, value);
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
+ return store.getRange(skip, take);
+ }
+
+ @Override
+ public void close() throws Exception {
+ store.close();
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
index 18e0b826291..40576d55c71 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
@@ -30,22 +30,24 @@
import org.apache.drill.exec.store.sys.VersionedPersistentStore;
/**
- * Versioned Store that delegates operations to PersistentStore
- * @param <V>
+ * Versioned store that delegates operations to PersistentStore and keeps
versioning,
+ * incrementing version each time write / delete operation is triggered.
+ * Once created initial version is 0. Can be used only for local versioning,
not distributed.
+ *
+ * @param <V> store value type
*/
public class VersionedDelegatingStore<V> implements
VersionedPersistentStore<V> {
private final PersistentStore<V> store;
- private final ReadWriteLock readWriteLock;
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
private int version;
public VersionedDelegatingStore(PersistentStore<V> store) {
this.store = store;
- readWriteLock = new ReentrantReadWriteLock();
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = new AutoCloseableLock(readWriteLock.readLock());
writeLock = new AutoCloseableLock(readWriteLock.writeLock());
- version = -1;
+ version = 0;
}
@Override
@@ -113,7 +115,7 @@ public void close() throws Exception
{
try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
store.close();
- version = -1;
+ version = DataChangeVersion.NOT_AVAILABLE;
}
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
index aa6ee9d1717..75cef2f472e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
@@ -17,21 +17,23 @@
*/
package org.apache.drill.exec.store.sys.store.provider;
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.common.util.function.CheckedFunction;
public class CachingPersistentStoreProvider extends
BasePersistentStoreProvider {
-// private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(CachingPersistentStoreProvider.class);
- private final ConcurrentMap<PersistentStoreConfig<?>, PersistentStore<?>>
storeCache = Maps.newConcurrentMap();
+ private final Map<PersistentStoreConfig<?>, PersistentStore<?>> storeCache =
new ConcurrentHashMap<>();
+ private final Map<PersistentStoreConfig<?>, VersionedPersistentStore<?>>
versionedStoreCache = new ConcurrentHashMap<>();
private final PersistentStoreProvider provider;
public CachingPersistentStoreProvider(PersistentStoreProvider provider) {
@@ -41,21 +43,15 @@ public
CachingPersistentStoreProvider(PersistentStoreProvider provider) {
@Override
@SuppressWarnings("unchecked")
public <V> PersistentStore<V> getOrCreateStore(final
PersistentStoreConfig<V> config) throws StoreException {
- final PersistentStore<?> store = storeCache.get(config);
- if (store == null) {
- final PersistentStore<?> newStore = provider.getOrCreateStore(config);
- final PersistentStore<?> finalStore = storeCache.putIfAbsent(config,
newStore);
- if (finalStore == null) {
- return (PersistentStore<V>)newStore;
- }
- try {
- newStore.close();
- } catch (Exception ex) {
- throw new StoreException(ex);
- }
- }
+ CheckedFunction<PersistentStoreConfig<?>, PersistentStore<?>,
StoreException> function = provider::getOrCreateStore;
+ return (PersistentStore<V>) storeCache.computeIfAbsent(config, function);
+ }
- return (PersistentStore<V>) store;
+ @Override
+ @SuppressWarnings("unchecked")
+ public <V> VersionedPersistentStore<V>
getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws
StoreException {
+ CheckedFunction<PersistentStoreConfig<?>, VersionedPersistentStore<?>,
StoreException> function = provider::getOrCreateVersionedStore;
+ return (VersionedPersistentStore<V>)
versionedStoreCache.computeIfAbsent(config, function);
}
@Override
@@ -65,12 +61,19 @@ public void start() throws Exception {
@Override
public void close() throws Exception {
- final List<AutoCloseable> closeables = Lists.newArrayList();
- for (final AutoCloseable store : storeCache.values()) {
- closeables.add(store);
- }
- closeables.add(provider);
+ List<AutoCloseable> closeables = new ArrayList<>();
+
+ // add un-versioned stores
+ closeables.addAll(storeCache.values());
storeCache.clear();
+
+ // add versioned stores
+ closeables.addAll(versionedStoreCache.values());
+ versionedStoreCache.clear();
+
+ // add provider
+ closeables.add(provider);
+
AutoCloseables.close(closeables);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
index 3ab85ec291e..6a70df775dc 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
@@ -17,11 +17,12 @@
*/
package org.apache.drill.exec.store.sys.store.provider;
-import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.exec.store.sys.store.InMemoryStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
public class InMemoryStoreProvider implements PersistentStoreProvider {
@@ -35,10 +36,15 @@ public InMemoryStoreProvider(int capacity) {
public void close() throws Exception { }
@Override
- public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V>
config) throws StoreException {
+ public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V>
config) {
return new InMemoryStore<>(capacity);
}
@Override
- public void start() throws Exception { }
+ public <V> VersionedPersistentStore<V>
getOrCreateVersionedStore(PersistentStoreConfig<V> config) {
+ return new VersionedDelegatingStore<>(getOrCreateStore(config));
+ }
+
+ @Override
+ public void start() { }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
index af6777111b8..2dae62def52 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
@@ -26,7 +26,9 @@
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
import org.apache.drill.exec.testing.store.NoWriteLocalStore;
import org.apache.hadoop.fs.Path;
@@ -70,6 +72,10 @@ public LocalPersistentStoreProvider(final DrillConfig
config) throws StoreExcept
}
}
+ @Override
+ public <V> VersionedPersistentStore<V>
getOrCreateVersionedStore(PersistentStoreConfig<V> config) {
+ return new VersionedDelegatingStore<>(getOrCreateStore(config));
+ }
@Override
public void close() throws Exception {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
index 8f5252d1e25..978231575ae 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
@@ -20,7 +20,6 @@
import
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.categories.SqlFunctionTest;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
import org.junit.Before;
@@ -30,9 +29,11 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -51,7 +52,7 @@
@BeforeClass
public static void init() {
- newJars = Maps.newHashMap();
+ newJars = new HashMap<>();
FunctionHolder lower = new FunctionHolder("lower",
"lower(VARCHAR-REQUIRED)", mock(DrillFuncHolder.class));
FunctionHolder upper = new FunctionHolder("upper",
"upper(VARCHAR-REQUIRED)", mock(DrillFuncHolder.class));
newJars.put(built_in, Lists.newArrayList(lower, upper));
@@ -69,9 +70,9 @@ public void setup() {
@Test
public void testVersion() {
resetRegistry();
- long expectedVersion = 0;
+ int expectedVersion = 0;
assertEquals("Initial version should be 0", expectedVersion,
registryHolder.getVersion());
- registryHolder.addJars(Maps.<String, List<FunctionHolder>>newHashMap(),
++expectedVersion);
+ registryHolder.addJars(new HashMap<>(), ++expectedVersion);
assertEquals("Version can change if no jars were added.", expectedVersion,
registryHolder.getVersion());
fillInRegistry(++expectedVersion);
assertEquals("Version should have incremented by 1", expectedVersion,
registryHolder.getVersion());
@@ -87,7 +88,7 @@ public void testVersion() {
public void testAddJars() {
resetRegistry();
int functionsSize = 0;
- List<String> jars = Lists.newArrayList();
+ List<String> jars = new ArrayList<>();
ListMultimap<String, DrillFuncHolder> functionsWithHolders =
ArrayListMultimap.create();
ListMultimap<String, String> functionsWithSignatures =
ArrayListMultimap.create();
for (Map.Entry<String, List<FunctionHolder>> jar : newJars.entrySet()) {
@@ -99,7 +100,7 @@ public void testAddJars() {
}
}
- long expectedVersion = 0;
+ int expectedVersion = 0;
registryHolder.addJars(newJars, ++expectedVersion);
assertEquals("Version number should match", expectedVersion,
registryHolder.getVersion());
compareTwoLists(jars, registryHolder.getAllJarNames());
@@ -112,7 +113,7 @@ public void testAddJars() {
public void testAddTheSameJars() {
resetRegistry();
int functionsSize = 0;
- List<String> jars = Lists.newArrayList();
+ List<String> jars = new ArrayList<>();
ListMultimap<String, DrillFuncHolder> functionsWithHolders =
ArrayListMultimap.create();
ListMultimap<String, String> functionsWithSignatures =
ArrayListMultimap.create();
for (Map.Entry<String, List<FunctionHolder>> jar : newJars.entrySet()) {
@@ -123,7 +124,7 @@ public void testAddTheSameJars() {
functionsSize++;
}
}
- long expectedVersion = 0;
+ int expectedVersion = 0;
registryHolder.addJars(newJars, ++expectedVersion);
assertEquals("Version number should match", expectedVersion,
registryHolder.getVersion());
compareTwoLists(jars, registryHolder.getAllJarNames());
@@ -150,16 +151,15 @@ public void testRemoveJar() {
@Test
public void testGetAllJarNames() {
- ArrayList<String> expectedResult = Lists.newArrayList(newJars.keySet());
+ List<String> expectedResult = new ArrayList<>(newJars.keySet());
compareTwoLists(expectedResult, registryHolder.getAllJarNames());
}
@Test
public void testGetFunctionNamesByJar() {
- ArrayList<String> expectedResult = Lists.newArrayList();
- for (FunctionHolder functionHolder : newJars.get(built_in)) {
- expectedResult.add(functionHolder.getName());
- }
+ List<String> expectedResult = newJars.get(built_in).stream()
+ .map(FunctionHolder::getName)
+ .collect(Collectors.toList());
compareTwoLists(expectedResult,
registryHolder.getFunctionNamesByJar(built_in));
}
@@ -171,7 +171,7 @@ public void testGetAllFunctionsWithHoldersWithVersion() {
expectedResult.put(functionHolder.getName(),
functionHolder.getHolder());
}
}
- AtomicLong version = new AtomicLong();
+ AtomicInteger version = new AtomicInteger();
compareListMultimaps(expectedResult,
registryHolder.getAllFunctionsWithHolders(version));
assertEquals("Version number should match", version.get(),
registryHolder.getVersion());
}
@@ -200,30 +200,26 @@ public void testGetAllFunctionsWithSignatures() {
@Test
public void testGetHoldersByFunctionNameWithVersion() {
- List<DrillFuncHolder> expectedResult = Lists.newArrayList();
- for (List<FunctionHolder> functionHolders : newJars.values()) {
- for (FunctionHolder functionHolder : functionHolders) {
- if ("lower".equals(functionHolder.getName())) {
- expectedResult.add(functionHolder.getHolder());
- }
- }
- }
+ List<DrillFuncHolder> expectedResult = newJars.values().stream()
+ .flatMap(Collection::stream)
+ .filter(f -> "lower".equals(f.getName()))
+ .map(FunctionHolder::getHolder)
+ .collect(Collectors.toList());
+
assertFalse(expectedResult.isEmpty());
- AtomicLong version = new AtomicLong();
+ AtomicInteger version = new AtomicInteger();
compareTwoLists(expectedResult,
registryHolder.getHoldersByFunctionName("lower", version));
assertEquals("Version number should match", version.get(),
registryHolder.getVersion());
}
@Test
public void testGetHoldersByFunctionName() {
- List<DrillFuncHolder> expectedResult = Lists.newArrayList();
- for (List<FunctionHolder> functionHolders : newJars.values()) {
- for (FunctionHolder functionHolder : functionHolders) {
- if ("lower".equals(functionHolder.getName())) {
- expectedResult.add(functionHolder.getHolder());
- }
- }
- }
+ List<DrillFuncHolder> expectedResult = newJars.values().stream()
+ .flatMap(Collection::stream)
+ .filter(f -> "lower".equals(f.getName()))
+ .map(FunctionHolder::getHolder)
+ .collect(Collectors.toList());
+
assertFalse(expectedResult.isEmpty());
compareTwoLists(expectedResult,
registryHolder.getHoldersByFunctionName("lower"));
}
@@ -236,10 +232,9 @@ public void testContainsJar() {
@Test
public void testFunctionsSize() {
- int count = 0;
- for (List<FunctionHolder> functionHolders : newJars.values()) {
- count += functionHolders.size();
- }
+ int count = newJars.values().stream()
+ .mapToInt(List::size)
+ .sum();
assertEquals("Functions size should match", count,
registryHolder.functionsSize());
}
@@ -256,7 +251,7 @@ private void resetRegistry() {
registryHolder = new FunctionRegistryHolder();
}
- private void fillInRegistry(long version) {
+ private void fillInRegistry(int version) {
registryHolder.addJars(newJars, version);
}
@@ -266,7 +261,7 @@ private void fillInRegistry(long version) {
assertEquals("Multimaps size should match", m1.size(), m2.size());
for (Map.Entry<String, Collection<T>> entry : m1.entrySet()) {
try {
- compareTwoLists(Lists.newArrayList(entry.getValue()),
Lists.newArrayList(m2.get(entry.getKey())));
+ compareTwoLists(new ArrayList<>(entry.getValue()), new
ArrayList<>(m2.get(entry.getKey())));
} catch (AssertionError e) {
throw new AssertionError("Multimaps values should match", e);
}
@@ -275,9 +270,7 @@ private void fillInRegistry(long version) {
private <T> void compareTwoLists(List<T> l1, List<T> l2) {
assertEquals("Lists size should match", l1.size(), l2.size());
- for (T item : l1) {
- assertTrue("Two lists should have the same values", l2.contains(item));
- }
+ l1.forEach(i -> assertTrue("Two lists should have the same values",
l2.contains(i)));
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
index a5a3c5121aa..16ac42e27e9 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
@@ -64,7 +64,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@@ -762,7 +762,7 @@ public void
testConcurrentRemoteRegistryUpdateWithDuplicates() throws Exception
DataChangeVersion version = new DataChangeVersion();
Registry registry = remoteFunctionRegistry.getRegistry(version);
- assertEquals("Remote registry version should match", 1,
version.getVersion());
+ assertEquals("Remote registry version should match", 2,
version.getVersion());
List<Jar> jarList = registry.getJarList();
assertEquals("Only one jar should be registered", 1, jarList.size());
assertEquals("Jar name should match", jar1, jarList.get(0).getName());
@@ -823,7 +823,7 @@ public void
testConcurrentRemoteRegistryUpdateForDifferentJars() throws Exceptio
DataChangeVersion version = new DataChangeVersion();
Registry registry = remoteFunctionRegistry.getRegistry(version);
- assertEquals("Remote registry version should match", 2,
version.getVersion());
+ assertEquals("Remote registry version should match", 3,
version.getVersion());
List<Jar> actualJars = registry.getJarList();
List<String> expectedJars = Lists.newArrayList(jar1, jar2);
@@ -861,7 +861,7 @@ public void testLazyInitConcurrent() throws Exception {
assertTrue("syncWithRemoteRegistry() should return true", result);
return true;
})
-
.when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong());
+ .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt());
SimpleQueryRunner simpleQueryRunner = new SimpleQueryRunner(query);
Thread thread1 = new Thread(simpleQueryRunner);
@@ -873,10 +873,10 @@ public void testLazyInitConcurrent() throws Exception {
thread1.join();
thread2.join();
- verify(functionImplementationRegistry,
times(2)).syncWithRemoteRegistry(anyLong());
+ verify(functionImplementationRegistry,
times(2)).syncWithRemoteRegistry(anyInt());
LocalFunctionRegistry localFunctionRegistry =
(LocalFunctionRegistry)FieldUtils.readField(
functionImplementationRegistry, "localFunctionRegistry", true);
- assertEquals("Sync function registry version should match", 1L,
localFunctionRegistry.getVersion());
+ assertEquals("Sync function registry version should match", 2,
localFunctionRegistry.getVersion());
}
@Test
@@ -895,7 +895,7 @@ public void testLazyInitNoReload() throws Exception {
assertFalse("syncWithRemoteRegistry() should return false", result);
return false;
})
-
.when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong());
+ .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt());
test("select custom_lower('A') from (values(1))");
@@ -906,10 +906,10 @@ public void testLazyInitNoReload() throws Exception {
assertThat(e.getMessage(), containsString("No match found for function
signature unknown_lower(<CHARACTER>)"));
}
- verify(functionImplementationRegistry,
times(2)).syncWithRemoteRegistry(anyLong());
+ verify(functionImplementationRegistry,
times(2)).syncWithRemoteRegistry(anyInt());
LocalFunctionRegistry localFunctionRegistry =
(LocalFunctionRegistry)FieldUtils.readField(
functionImplementationRegistry, "localFunctionRegistry", true);
- assertEquals("Sync function registry version should match", 1L,
localFunctionRegistry.getVersion());
+ assertEquals("Sync function registry version should match", 2,
localFunctionRegistry.getVersion());
}
private static String buildJars(String jarName, String includeFiles, String
includeResources) {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
index 1709bdf501a..c71a2d46168 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
@@ -19,7 +19,7 @@
import ch.qos.logback.classic.Level;
import org.apache.drill.exec.client.LoggingResultsListener;
-import org.apache.drill.exec.util.CheckedSupplier;
+import org.apache.drill.common.util.function.CheckedSupplier;
import org.apache.drill.exec.util.VectorUtil;
import java.util.function.Supplier;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Dynamic UDFs registered on one Drillbit are not visible on other Drillbits
> --------------------------------------------------------------------------
>
> Key: DRILL-6762
> URL: https://issues.apache.org/jira/browse/DRILL-6762
> Project: Apache Drill
> Issue Type: Bug
> Components: Functions - Drill
> Affects Versions: 1.13.0
> Reporter: Kunal Khatua
> Assignee: Arina Ielchiieva
> Priority: Critical
> Labels: ready-to-commit
> Fix For: 1.15.0
>
> Attachments: Dynamic UDFs issue.pdf
>
>
> Originally Reported :
> https://stackoverflow.com/questions/52480160/dynamic-udf-in-apache-drill-cluster
> When using a 4-node Drill 1.14 cluster, UDF jars registered on one node are
> not usable on other nodes despite the {{/registry}} and ZK showing the UDFs
> as registered.
> This was previously working on 1.13.0
> *Root cause*
> 1. {{VersionedDelegatingStore}} was starting with version -1 and local
> function registry with version 0. This caused issues when
> {{LocalPersistentStore}} already existed on the file system. When adding jars
> into remote registry its versioned was bumped to 0 and synchronization did
> not happen since local registry had the same version.
> *Fix*: start {{VersionedDelegatingStore}} with version 0, local function
> registry with undefined version (-2) thus first sync will always happen.
> 2. {{PersistentStoreProvider.getOrCreateVersionedStore}} was wrapping stores
> into VersionedDelegatingStore for those store providers that did not override
> this method. Only Zookeeper store was overriding it. But
> {{VersionedDelegatingStore}} is only keeps versioning locally and thus can be
> applied only for local stores, i.e. Hbase, Mongo cannot use it.
> {{CachingPersistentStoreProvider}} did not override
> {{getOrCreateVersionedStore}} either. Mostly all stores in Drill are created
> using {{CachingPersistentStoreProvider}}. Thus all stores where wrapped into
> {{VersionedDelegatingStore}}, even Zookeeper one which caused function
> registries version synchronization issues.
> *Fix*: Add {{UndefinedVersionDelegatingStore}} for those stores that do not
> support versioning and wrap into it by default in
> {{PersistentStoreProvider.getOrCreateVersionedStore}} if this method is not
> overriden. {{UndefinedVersionDelegatingStore}} will return UNDEFINED version
> (-2). During sync between remote and local registries if remote registry has
> UNDEFINED version sync will be done immediately, on the contrary with
> NOT_AVAILABLE version (-1) which indicates that remote function registry is
> not accessible.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)