DRILL-4963: Fix issues with dynamically loaded overloaded functions close #701
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dcbcb94f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dcbcb94f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dcbcb94f Branch: refs/heads/master Commit: dcbcb94fd2695edd4bbca63b2759292e99695d47 Parents: 79811db Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> Authored: Tue Dec 20 16:57:15 2016 +0000 Committer: Jinfeng Ni <j...@apache.org> Committed: Wed Mar 1 23:46:19 2017 -0800 ---------------------------------------------------------------------- .../hbase/config/HBasePersistentStore.java | 16 +- .../drill/hbase/TestHBaseTableProvider.java | 7 +- .../mongo/config/MongoPersistentStore.java | 14 +- .../src/resources/drill-override-example.conf | 3 +- .../drill/exec/coord/zk/ZookeeperClient.java | 45 +++- .../exception/FunctionNotFoundException.java | 27 --- .../expr/fn/FunctionImplementationRegistry.java | 219 ++++++++++++------- .../fn/registry/FunctionRegistryHolder.java | 27 +-- .../expr/fn/registry/LocalFunctionRegistry.java | 31 ++- .../fn/registry/RemoteFunctionRegistry.java | 24 +- .../org/apache/drill/exec/ops/QueryContext.java | 11 +- .../exec/planner/sql/DrillOperatorTable.java | 29 ++- .../drill/exec/planner/sql/DrillSqlWorker.java | 70 +++--- .../drill/exec/planner/sql/SqlConverter.java | 9 - .../sql/handlers/CreateFunctionHandler.java | 3 +- .../sql/handlers/DropFunctionHandler.java | 3 +- .../exec/store/sys/BasePersistentStore.java | 8 +- .../drill/exec/store/sys/PersistentStore.java | 20 +- .../store/sys/store/LocalPersistentStore.java | 188 ++++++++++------ .../sys/store/ZookeeperPersistentStore.java | 18 +- .../exec/testing/store/NoWriteLocalStore.java | 33 ++- .../org/apache/drill/TestDynamicUDFSupport.java | 101 ++++++--- .../exec/coord/zk/TestZookeeperClient.java | 30 ++- .../fn/registry/FunctionRegistryHolderTest.java | 45 ++-- .../record/ExpressionTreeMaterializerTest.java | 7 +- .../jars/DrillUDF-overloading-1.0-sources.jar | Bin 0 -> 3473 bytes .../resources/jars/DrillUDF-overloading-1.0.jar | Bin 0 -> 5779 bytes 27 files changed, 639 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java index 2d329a8..ef6bbfe 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -67,6 +67,20 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(String key) { + try { + Get get = new Get(row(key)); + get.addColumn(FAMILY, QUALIFIER); + return hbaseTable.exists(get); + } catch (IOException e) { + throw UserException + .dataReadError(e) + .message("Caught error while checking row existence '%s' for table '%s'", key, hbaseTableName) + .build(logger); + } + } + + @Override public V get(String key) { return get(key, FAMILY); } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index 6b73283..f278359 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,8 @@ package org.apache.drill.hbase; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.Map.Entry; @@ -57,6 +59,9 @@ public class TestHBaseTableProvider extends BaseHBaseTest { assertEquals("v0", hbaseStore.get("")); assertEquals("testValue", hbaseStore.get(".test")); + assertTrue(hbaseStore.contains("")); + assertFalse(hbaseStore.contains("unknown_key")); + int rowCount = 0; for (Entry<String, String> entry : Lists.newArrayList(hbaseStore.getAll())) { rowCount++; http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java index b5cc3ee..73ff31d 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -61,6 +61,18 @@ public class MongoPersistentStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(String key) { + try { + Bson query = Filters.eq(DrillMongoConstants.ID, key); + Document document = collection.find(query).first(); + return document != null && document.containsKey(pKey); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override public V get(String key) { try { Bson query = Filters.eq(DrillMongoConstants.ID, key); http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/distribution/src/resources/drill-override-example.conf ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index 43f9942..b9d09a8 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -171,8 +171,7 @@ drill.exec: { decode_threadpool_size: 1 }, debug.error_on_leak: true, - # Settings for Dynamic UDFs. - # See https://gist.github.com/arina-ielchiieva/a1c4cfa3890145c5ecb1b70a39cbff55#file-dynamicudfssupport-md. + # Settings for Dynamic UDFs (see https://issues.apache.org/jira/browse/DRILL-4726 for details). udf: { # number of retry attempts to update remote function registry # if registry version was changed during update http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java index 610a2b9..17cb6cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -85,32 +85,53 @@ public class ZookeeperClient implements AutoCloseable { /** * Returns true if path exists in the cache, false otherwise. - * * Note that calls to this method are eventually consistent. * - * @param path path to check + * @param path path to check + * @return true if path exists, false otherwise */ public boolean hasPath(final String path) { - return hasPath(path, false); + return hasPath(path, false, null); + } + + /** + * Returns true if path exists, false otherwise. + * If consistent flag is set to true, check is done directly is made against Zookeeper directly, + * else check is done against local cache. + * + * @param path path to check + * @param consistent whether the check should be consistent + * @return true if path exists, false otherwise + */ + public boolean hasPath(final String path, final boolean consistent) { + return hasPath(path, consistent, null); } /** * Checks if the given path exists. + * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. + * Otherwise, the check is eventually consistent. * - * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise, - * the check is eventually consistent. + * If consistency flag is set to true and version holder is not null, passes version holder to get data change version. + * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed. + * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes * - * @param path path to check - * @param consistent whether the check should be consistent - * @return + * @param path path to check + * @param consistent whether the check should be consistent + * @param version version holder + * @return true if path exists, false otherwise */ - public boolean hasPath(final String path, final boolean consistent) { + public boolean hasPath(final String path, final boolean consistent, final DataChangeVersion version) { Preconditions.checkNotNull(path, "path is required"); final String target = PathUtils.join(root, path); try { if (consistent) { - return curator.checkExists().forPath(target) != null; + Stat stat = curator.checkExists().forPath(target); + if (version != null && stat != null) { + version.setVersion(stat.getVersion()); + } + return stat != null; } else { return getCache().getCurrentData(target) != null; } @@ -153,7 +174,7 @@ public class ZookeeperClient implements AutoCloseable { * @param path target path * @param version version holder */ - public byte[] get(final String path, DataChangeVersion version) { + public byte[] get(final String path, final DataChangeVersion version) { return get(path, true, version); } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java deleted file mode 100644 index 0d59cc8..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.exception; - -import org.apache.drill.common.exceptions.DrillRuntimeException; - -public class FunctionNotFoundException extends DrillRuntimeException { - - public FunctionNotFoundException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java index ce0d68b..5c7bfb4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -58,11 +58,13 @@ import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.proto.UserBitShared.Jar; import org.apache.drill.exec.resolver.FunctionResolver; +import org.apache.drill.exec.resolver.FunctionResolverFactory; import org.apache.drill.exec.server.options.OptionManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; import org.apache.drill.exec.util.JarUtil; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -128,7 +130,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au /** * Register functions in given operator table. - * @param operatorTable + * @param operatorTable operator table */ public void register(DrillOperatorTable operatorTable) { // Register Drill functions first and move to pluggable function registries. @@ -140,27 +142,39 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au } /** - * Using the given <code>functionResolver</code> - * finds Drill function implementation for given <code>functionCall</code>. - * If function implementation was not found, - * loads all missing remote functions and tries to find Drill implementation one more time. + * First attempts to finds the Drill function implementation that matches the name, arg types and return type. + * If exact function implementation was not found, + * syncs local function registry with remote function registry if needed + * and tries to find function implementation one more time + * but this time using given <code>functionResolver</code>. + * + * @param functionResolver function resolver + * @param functionCall function call + * @return best matching function holder */ @Override public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) { - return findDrillFunction(functionResolver, functionCall, true); - } - - private DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall, boolean retry) { AtomicLong version = new AtomicLong(); - DrillFuncHolder holder = functionResolver.getBestMatch( - localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall); - if (holder == null && retry && loadRemoteFunctions(version.get())) { - return findDrillFunction(functionResolver, functionCall, false); + String newFunctionName = functionReplacement(functionCall); + List<DrillFuncHolder> functions = localFunctionRegistry.getMethods(newFunctionName, version); + FunctionResolver exactResolver = FunctionResolverFactory.getExactResolver(functionCall); + DrillFuncHolder holder = exactResolver.getBestMatch(functions, functionCall); + + if (holder == null) { + syncWithRemoteRegistry(version.get()); + List<DrillFuncHolder> updatedFunctions = localFunctionRegistry.getMethods(newFunctionName, version); + holder = functionResolver.getBestMatch(updatedFunctions, functionCall); } + return holder; } - // Check if this Function Replacement is needed; if yes, return a new name. otherwise, return the original name + /** + * Checks if this function replacement is needed. + * + * @param functionCall function call + * @return new function name is replacement took place, otherwise original function name + */ private String functionReplacement(FunctionCall functionCall) { String funcName = functionCall.getName(); if (functionCall.args.size() > 0) { @@ -178,22 +192,41 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au } /** - * Find the Drill function implementation that matches the name, arg types and return type. - * If exact function implementation was not found, - * loads all missing remote functions and tries to find Drill implementation one more time. + * Finds the Drill function implementation that matches the name, arg types and return type. + * + * @param name function name + * @param argTypes input parameters types + * @param returnType function return type + * @return exactly matching function holder */ public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) { return findExactMatchingDrillFunction(name, argTypes, returnType, true); } - private DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType, boolean retry) { + /** + * Finds the Drill function implementation that matches the name, arg types and return type. + * If exact function implementation was not found, + * checks if local function registry is in sync with remote function registry. + * If not syncs them and tries to find exact function implementation one more time + * but with retry flag set to false. + * + * @param name function name + * @param argTypes input parameters types + * @param returnType function return type + * @param retry retry on failure flag + * @return exactly matching function holder + */ + private DrillFuncHolder findExactMatchingDrillFunction(String name, + List<MajorType> argTypes, + MajorType returnType, + boolean retry) { AtomicLong version = new AtomicLong(); for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) { if (h.matches(returnType, argTypes)) { return h; } } - if (retry && loadRemoteFunctions(version.get())) { + if (retry && syncWithRemoteRegistry(version.get())) { return findExactMatchingDrillFunction(name, argTypes, returnType, false); } return null; @@ -206,8 +239,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au * Note: Order of searching is same as order of {@link org.apache.drill.exec.expr.fn.PluggableFunctionRegistry} * implementations found on classpath. * - * @param functionCall - * @return + * @param functionCall function call + * @return drill function holder */ @Override public AbstractFuncHolder findNonDrillFunction(FunctionCall functionCall) { @@ -260,76 +293,101 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au } /** - * Attempts to load and register functions from remote function registry. - * First checks if there is no missing jars. - * If yes, enters synchronized block to prevent other loading the same jars. - * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock). - * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar. - * Jar registration timestamp represented in milliseconds is used as suffix. - * Then registers all jars at the same time. Returns true when finished. - * In case if any errors during jars coping or registration, logs errors and proceeds. + * Purpose of this method is to synchronize remote and local function registries if needed + * and to inform if function registry was changed after given version. + * + * To make synchronization as much light-weigh as possible, first only versions of both registries are checked + * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars. + * The need of synchronization is checked again (double-check lock) before comparing jars. + * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}. + * Once jar download is finished, all missing jars are registered in one batch. + * In case if any errors during jars download / registration, these errors are logged. * - * If no missing jars are found, checks current local registry version. - * Returns false if versions match, true otherwise. + * During registration local function registry is updated with remote function registry version it is synced with. + * When at least one jar of the missing jars failed to download / register, + * local function registry version are not updated but jars that where successfully downloaded / registered + * are added to local function registry. * - * @param version local function registry version - * @return true if new jars were registered or local function registry version is different, false otherwise + * If synchronization between remote and local function registry was not needed, + * checks if given registry version matches latest sync version + * to inform if function registry was changed after given version. + * + * @param version remote function registry local function registry was based on + * @return true if remote and local function registries were synchronized after given version */ - public boolean loadRemoteFunctions(long version) { - List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry); - if (!missingJars.isEmpty()) { + public boolean syncWithRemoteRegistry(long version) { + if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) { synchronized (this) { - missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry); - if (!missingJars.isEmpty()) { - logger.info("Starting dynamic UDFs lazy-init process.\n" + - "The following jars are going to be downloaded and registered locally: " + missingJars); + long localRegistryVersion = localFunctionRegistry.getVersion(); + if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion)) { + DataChangeVersion remoteVersion = new DataChangeVersion(); + List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion); List<JarScan> jars = Lists.newArrayList(); - for (String jarName : missingJars) { - Path binary = null; - Path source = null; - URLClassLoader classLoader = null; - try { - binary = copyJarToLocal(jarName, remoteFunctionRegistry); - source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry); - URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()}; - classLoader = new URLClassLoader(urls); - ScanResult scanResult = scan(classLoader, binary, urls); - localFunctionRegistry.validate(jarName, scanResult); - jars.add(new JarScan(jarName, scanResult, classLoader)); - } catch (Exception e) { - deleteQuietlyLocalJar(binary); - deleteQuietlyLocalJar(source); - if (classLoader != null) { - try { - classLoader.close(); - } catch (Exception ex) { - logger.warn("Problem during closing class loader for {}", jarName, e); + if (!missingJars.isEmpty()) { + logger.info("Starting dynamic UDFs lazy-init process.\n" + + "The following jars are going to be downloaded and registered locally: " + missingJars); + for (String jarName : missingJars) { + Path binary = null; + Path source = null; + URLClassLoader classLoader = null; + try { + binary = copyJarToLocal(jarName, this.remoteFunctionRegistry); + source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry); + URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()}; + classLoader = new URLClassLoader(urls); + ScanResult scanResult = scan(classLoader, binary, urls); + localFunctionRegistry.validate(jarName, scanResult); + jars.add(new JarScan(jarName, scanResult, classLoader)); + } catch (Exception e) { + deleteQuietlyLocalJar(binary); + deleteQuietlyLocalJar(source); + if (classLoader != null) { + try { + classLoader.close(); + } catch (Exception ex) { + logger.warn("Problem during closing class loader for {}", jarName, e); + } } + logger.error("Problem during remote functions load from {}", jarName, e); } - logger.error("Problem during remote functions load from {}", jarName, e); } } - if (!jars.isEmpty()) { - localFunctionRegistry.register(jars); - return true; - } + long latestRegistryVersion = jars.size() != missingJars.size() ? + localRegistryVersion : remoteVersion.getVersion(); + localFunctionRegistry.register(jars, latestRegistryVersion); + return true; } } } + return version != localFunctionRegistry.getVersion(); } /** - * First finds path to marker file url, otherwise throws {@link JarValidationException}. - * Then scans jar classes according to list indicated in marker files. - * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}. - * This is extremely important for Windows users where system doesn't allow to delete file if it's being used. + * Checks if local function registry should be synchronized with remote function registry. + * If remote function registry version is -1, it means that remote function registry is unreachable + * or is not configured thus we skip synchronization and return false. + * In all other cases synchronization is needed if remote and local function registries versions do not match. * - * @param classLoader unique class loader for jar - * @param path local path to jar - * @param urls urls associated with the jar (ex: binary and source) - * @return scan result of packages, classes, annotations found in jar + * @param remoteVersion remote function registry version + * @param localVersion local function registry version + * @return true is local registry should be refreshed, false otherwise */ + private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) { + return remoteVersion != -1 && remoteVersion != localVersion; + } + + /** + * First finds path to marker file url, otherwise throws {@link JarValidationException}. + * Then scans jar classes according to list indicated in marker files. + * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}. + * This is extremely important for Windows users where system doesn't allow to delete file if it's being used. + * + * @param classLoader unique class loader for jar + * @param path local path to jar + * @param urls urls associated with the jar (ex: binary and source) + * @return scan result of packages, classes, annotations found in jar + */ private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException { Enumeration<URL> markerFileEnumeration = classLoader.getResources( CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME); @@ -355,14 +413,17 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au /** * Return list of jars that are missing in local function registry * but present in remote function registry. + * Also updates version holder with remote function registry version. * * @param remoteFunctionRegistry remote function registry * @param localFunctionRegistry local function registry + * @param version holder for remote function registry version * @return list of missing jars */ private List<String> getMissingJars(RemoteFunctionRegistry remoteFunctionRegistry, - LocalFunctionRegistry localFunctionRegistry) { - List<Jar> remoteJars = remoteFunctionRegistry.getRegistry().getJarList(); + LocalFunctionRegistry localFunctionRegistry, + DataChangeVersion version) { + List<Jar> remoteJars = remoteFunctionRegistry.getRegistry(version).getJarList(); List<String> localJars = localFunctionRegistry.getAllJarNames(); List<String> missingJars = Lists.newArrayList(); for (Jar jar : remoteJars) { @@ -384,8 +445,10 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au private Path getLocalUdfDir(DrillConfig config) { tmpDir = getTmpDir(config); File udfDir = new File(tmpDir, config.getString(ExecConstants.UDF_DIRECTORY_LOCAL)); - udfDir.mkdirs(); String udfPath = udfDir.getPath(); + if (udfDir.mkdirs()) { + logger.debug("Local udf directory [{}] was created", udfPath); + } Preconditions.checkState(udfDir.exists(), "Local udf directory [%s] must exist", udfPath); Preconditions.checkState(udfDir.isDirectory(), "Local udf directory [%s] must be a directory", udfPath); Preconditions.checkState(udfDir.canWrite(), "Local udf directory [%s] must be writable for application user", udfPath); @@ -404,6 +467,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au * If value is still missing, generates directory using {@link Files#createTempDir()}. * If temporary directory was generated, sets {@link #deleteTmpDir} to true * to delete directory on drillbit exit. + * + * @param config drill config * @return drill temporary directory path */ private File getTmpDir(DrillConfig config) { http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java index 005c4e5..3124539 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -42,8 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * since we expect infrequent registry changes. * Holder is designed to allow concurrent reads and single writes to keep data consistent. * This is achieved by {@link ReadWriteLock} implementation usage. - * Holder has number version which changes every time new jars are added or removed. Initial version number is 0. - * Also version is used when user needs data from registry with version it is based on. + * Holder has number version which indicates remote function registry version number it is in sync with. * * Structure example: * @@ -86,7 +85,8 @@ public class FunctionRegistryHolder { private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); - private long version = 0; + // remote function registry number, it is in sync with + private long version; // jar name, Map<function name, Queue<function signature> private final Map<String, Map<String, Queue<String>>> jars; @@ -114,13 +114,13 @@ public class FunctionRegistryHolder { * If jar with the same name already exists, it and its functions will be removed. * Then jar will be added to {@link #jars} * and each function will be added using {@link #addFunctions(Map, List)}. - * Function version registry will be incremented by 1 if at least one jar was added but not for each jar. + * Registry version is updated with passed version if all jars were added successfully. * This is write operation, so one user at a time can call perform such action, * others will wait till first user completes his action. * * @param newJars jars and list of their function holders, each contains function name, signature and holder */ - public void addJars(Map<String, List<FunctionHolder>> newJars) { + public void addJars(Map<String, List<FunctionHolder>> newJars, long version) { try (AutoCloseableLock lock = writeLock.open()) { for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) { String jarName = newJar.getKey(); @@ -129,15 +129,12 @@ public class FunctionRegistryHolder { jars.put(jarName, jar); addFunctions(jar, newJar.getValue()); } - if (!newJars.isEmpty()) { - version++; - } + this.version = version; } } /** * Removes jar from {@link #jars} and all associated with jar functions from {@link #functions} - * If jar was removed, function registry version will be incremented by 1. * This is write operation, so one user at a time can call perform such action, * others will wait till first user completes his action. * @@ -145,9 +142,7 @@ public class FunctionRegistryHolder { */ public void removeJar(String jarName) { try (AutoCloseableLock lock = writeLock.open()) { - if (removeAllByJar(jarName)) { - version++; - } + removeAllByJar(jarName); } } @@ -341,12 +336,11 @@ public class FunctionRegistryHolder { * All jar functions have the same class loader, so we need to close only one time. * * @param jarName jar name to be removed - * @return true if jar was removed, false otherwise */ - private boolean removeAllByJar(String jarName) { + private void removeAllByJar(String jarName) { Map<String, Queue<String>> jar = jars.remove(jarName); if (jar == null) { - return false; + return; } for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) { @@ -372,6 +366,5 @@ public class FunctionRegistryHolder { functions.remove(function); } } - return true; } } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java index 2a3f167..1318f72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -73,12 +73,14 @@ public class LocalFunctionRegistry { private final FunctionRegistryHolder registryHolder; - /** Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in. - * Built-in functions are not allowed to be unregistered. */ + /** + * Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in. + * Built-in functions are not allowed to be unregistered. Initially sync registry version will be set to 0. + */ public LocalFunctionRegistry(ScanResult classpathScan) { registryHolder = new FunctionRegistryHolder(); validate(BUILT_IN, classpathScan); - register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader()))); + register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())), 0); if (logger.isTraceEnabled()) { StringBuilder allFunctions = new StringBuilder(); for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) { @@ -89,7 +91,7 @@ public class LocalFunctionRegistry { } /** - * @return local function registry version number + * @return remote function registry version number with which local function registry is synced */ public long getVersion() { return registryHolder.getVersion(); @@ -147,14 +149,15 @@ public class LocalFunctionRegistry { } /** - * Registers all functions present in jar. + * Registers all functions present in jar and updates registry version. * If jar name is already registered, all jar related functions will be overridden. * To prevent classpath collisions during loading and unloading jars, * each jar is shipped with its own class loader. * * @param jars list of jars to be registered + * @param version remote function registry version number with which local function registry is synced */ - public void register(List<JarScan> jars) { + public void register(List<JarScan> jars, long version) { Map<String, List<FunctionHolder>> newJars = Maps.newHashMap(); for (JarScan jarScan : jars) { FunctionConverter converter = new FunctionConverter(); @@ -174,7 +177,7 @@ public class LocalFunctionRegistry { } } } - registryHolder.addJars(newJars); + registryHolder.addJars(newJars, version); } /** @@ -217,25 +220,31 @@ public class LocalFunctionRegistry { return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version); } + /** + * @param name function name + * @return all function holders associated with the function name. Function name is case insensitive. + */ public List<DrillFuncHolder> getMethods(String name) { return registryHolder.getHoldersByFunctionName(name.toLowerCase()); } /** * Registers all functions present in {@link DrillOperatorTable}, - * also sets local registry version used at the moment of registering. + * also sets sync registry version used at the moment of function registration. * * @param operatorTable drill operator table */ public void register(DrillOperatorTable operatorTable) { AtomicLong versionHolder = new AtomicLong(); - final Map<String, Collection<DrillFuncHolder>> registeredFunctions = registryHolder.getAllFunctionsWithHolders(versionHolder).asMap(); + final Map<String, Collection<DrillFuncHolder>> registeredFunctions = + registryHolder.getAllFunctionsWithHolders(versionHolder).asMap(); operatorTable.setFunctionRegistryVersion(versionHolder.get()); registerOperatorsWithInference(operatorTable, registeredFunctions); registerOperatorsWithoutInference(operatorTable, registeredFunctions); } - private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) { + private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, + Collection<DrillFuncHolder>> registeredFunctions) { final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap(); final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap(); for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) { http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java index fe79583..2e5eda2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -106,8 +106,26 @@ public class RemoteFunctionRegistry implements AutoCloseable { this.retryAttempts = config.getInt(ExecConstants.UDF_RETRY_ATTEMPTS); } - public Registry getRegistry() { - return registry.get(registry_path, null); + /** + * Returns current remote function registry version. + * If remote function registry is not found or unreachable, logs error and returns -1. + * + * @return remote function registry version if any, -1 otherwise + */ + public long getRegistryVersion() { + DataChangeVersion version = new DataChangeVersion(); + boolean contains = false; + try { + contains = registry.contains(registry_path, version); + } catch (Exception e) { + logger.error("Problem during trying to access remote function registry [{}]", registry_path, e); + } + if (contains) { + return version.getVersion(); + } else { + logger.error("Remote function registry [{}] is unreachable", registry_path); + return -1; + } } public Registry getRegistry(DataChangeVersion version) { http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 264af29..707815a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -66,7 +66,6 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem private final UserSession session; private final OptionManager queryOptions; private final PlannerSettings plannerSettings; - private final DrillOperatorTable table; private final ExecutionControls executionControls; private final BufferAllocator allocator; @@ -83,6 +82,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem * time this is set to true and the close method becomes a no-op. */ private boolean closed = false; + private DrillOperatorTable table; public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) { this.drillbitContext = drillbitContext; @@ -229,6 +229,15 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem return table; } + /** + * Re-creates drill operator table to refresh functions list from local function registry. + */ + public void reloadDrillOperatorTable() { + table = new DrillOperatorTable( + drillbitContext.getFunctionImplementationRegistry(), + drillbitContext.getOptionManager()); + } + public QueryContextInformation getQueryContextInfo() { return queryContextInfo; } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java index 6e5c72b..5102ae8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,7 +36,7 @@ import org.apache.drill.exec.server.options.OptionManager; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; + /** * Implementation of {@link SqlOperatorTable} that contains standard operators and functions provided through @@ -52,8 +52,8 @@ public class DrillOperatorTable extends SqlStdOperatorTable { private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithoutInferenceMap = ArrayListMultimap.create(); private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create(); - // indicates local function registry version based on which drill operator were loaded - // is used to define if we need to reload operator table in case when function signature was not found + // indicates remote function registry version based on which drill operator were loaded + // is used to define if we need to reload operator table in case remote function registry version has changed private long functionRegistryVersion; private final OptionManager systemOptionManager; @@ -65,19 +65,18 @@ public class DrillOperatorTable extends SqlStdOperatorTable { this.systemOptionManager = systemOptionManager; } - /** Cleans up all operator holders and reloads operators */ - public void reloadOperators(FunctionImplementationRegistry registry) { - drillOperatorsWithoutInference.clear(); - drillOperatorsWithInference.clear(); - drillOperatorsWithoutInferenceMap.clear(); - drillOperatorsWithInferenceMap.clear(); - registry.register(this); - } - - public long setFunctionRegistryVersion(long version) { - return functionRegistryVersion = version; + /** + * Set function registry version based on which operator table was loaded. + * + * @param version registry version + */ + public void setFunctionRegistryVersion(long version) { + functionRegistryVersion = version; } + /** + * @return function registry version based on which operator table was loaded + */ public long getFunctionRegistryVersion() { return functionRegistryVersion; } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index 0ad3944..3bc0922 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -24,10 +24,7 @@ import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.exception.FunctionNotFoundException; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; @@ -50,13 +47,56 @@ public class DrillSqlWorker { private DrillSqlWorker() { } + /** + * Converts sql query string into query physical plan. + * + * @param context query context + * @param sql sql query + * @return query physical plan + */ public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException, ForemanSetupException { return getPlan(context, sql, null); } + /** + * Converts sql query string into query physical plan. + * In case of any errors (that might occur due to missing function implementation), + * checks if local function registry should be synchronized with remote function registry. + * If sync took place, reloads drill operator table + * (since functions were added to / removed from local function registry) + * and attempts to converts sql query string into query physical plan one more time. + * + * @param context query context + * @param sql sql query + * @param textPlan text plan + * @return query physical plan + */ public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException { + Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value); + try { + return getQueryPlan(context, sql, textPlan); + } catch (Exception e) { + if (context.getFunctionRegistry().syncWithRemoteRegistry( + context.getDrillOperatorTable().getFunctionRegistryVersion())) { + context.reloadDrillOperatorTable(); + return getQueryPlan(context, sql, textPlanCopy); + } + throw e; + } + } + + /** + * Converts sql query string into query physical plan. + * + * @param context query context + * @param sql sql query + * @param textPlan text plan + * @return query physical plan + */ + private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer<String> textPlan) + throws ForemanSetupException { final SqlConverter parser = new SqlConverter(context); @@ -88,7 +128,7 @@ public class DrillSqlWorker { } try { - return getPhysicalPlan(handler, sqlNode, context); + return handler.getPlan(sqlNode); } catch(ValidationException e) { String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); throw UserException.validationError(e) @@ -104,26 +144,4 @@ public class DrillSqlWorker { throw new QueryInputException("Failure handling SQL.", e); } } - - /** - * Returns query physical plan. - * In case of {@link FunctionNotFoundException} attempts to load remote functions. - * If at least one function was loaded or local function function registry version has changed, - * makes one more attempt to get query physical plan. - */ - private static PhysicalPlan getPhysicalPlan(AbstractSqlHandler handler, SqlNode sqlNode, QueryContext context) - throws RelConversionException, IOException, ForemanSetupException, ValidationException { - try { - return handler.getPlan(sqlNode); - } catch (FunctionNotFoundException e) { - DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable(); - FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry(); - if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) { - drillOperatorTable.reloadOperators(functionRegistry); - return handler.getPlan(sqlNode); - } - throw e; - } - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java index e9085f7..845848c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java @@ -49,17 +49,13 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.ChainedSqlOperatorTable; import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; -import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.sql.validate.SqlValidatorImpl; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.sql2rel.SqlToRelConverter; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.exception.FunctionNotFoundException; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.UdfUtilities; @@ -173,11 +169,6 @@ public class SqlConverter { SqlNode validatedNode = validator.validate(parsedNode); return validatedNode; } catch (RuntimeException e) { - final Throwable rootCause = ExceptionUtils.getRootCause(e); - if (rootCause instanceof SqlValidatorException - && StringUtils.contains(rootCause.getMessage(), "No match found for function signature")) { - throw new FunctionNotFoundException(rootCause.getMessage(), e); - } UserException.Builder builder = UserException .validationError(e) .addContext("SQL Query", sql); http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java index 48bfd8b..0902fb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -186,6 +186,7 @@ public class CreateFunctionHandler extends DefaultSqlHandler { remoteRegistry.updateRegistry(updatedRegistry, version); return; } catch (VersionMismatchException ex) { + logger.debug("Failed to update function registry during registration, version mismatch was detected.", ex); retryAttempts--; } } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java index 6e2801a..b5d0b23 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -142,6 +142,7 @@ public class DropFunctionHandler extends DefaultSqlHandler { remoteFunctionRegistry.updateRegistry(updatedRegistry, version); return jarToBeDeleted; } catch (VersionMismatchException ex) { + logger.debug("Failed to update function registry during unregistration, version mismatch was detected.", ex); retryAttempts--; } } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java index ea38278..0640407 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,6 +29,12 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> { return getRange(0, Integer.MAX_VALUE); } + /** By default contains with version will behave the same way as without version. + * Override this method to add version support. */ + public boolean contains(String key, DataChangeVersion version) { + return contains(key); + } + /** By default get with version will behave the same way as without version. * Override this method to add version support. */ @Override http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java index bb23752..206642a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,6 +34,24 @@ public interface PersistentStore<V> extends AutoCloseable { PersistentStoreMode getMode(); /** + * Checks if lookup key is present in store. + * + * @param key lookup key + * @return true if store contains lookup key, false otherwise + */ + boolean contains(String key); + + /** + * Checks if lookup key is present in store. + * Sets data change version number. + * + * @param key lookup key + * @param version version holder + * @return true if store contains lookup key, false otherwise + */ + boolean contains(String key, DataChangeVersion version); + + /** * Returns the value for the given key if exists, null otherwise. * @param key lookup key */ http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java index b9a4b59..ef855e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,12 +28,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.exception.VersionMismatchException; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.sys.BasePersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; @@ -47,13 +51,20 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LocalPersistentStore<V> extends BasePersistentStore<V> { -// private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); + private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); private final Path basePath; private final PersistentStoreConfig<V> config; private final DrillFileSystem fs; + private int version = -1; public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) { super(); @@ -62,7 +73,9 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { this.fs = fs; try { - mkdirs(basePath); + if (!fs.mkdirs(basePath)) { + version++; + } } catch (IOException e) { throw new RuntimeException("Failure setting pstore configuration path."); } @@ -73,11 +86,7 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { return PersistentStoreMode.PERSISTENT; } - private void mkdirs(Path path) throws IOException{ - fs.mkdirs(path); - } - - public static Path getLogDir(){ + public static Path getLogDir() { String drillLogDir = System.getenv("DRILL_LOG_DIR"); if (drillLogDir == null) { drillLogDir = "/var/log/drill"; @@ -85,10 +94,10 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { return new Path(new File(drillLogDir).getAbsoluteFile().toURI()); } - public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{ + public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException { Path blobRoot = root == null ? getLogDir() : root; Configuration fsConf = new Configuration(); - if(blobRoot.toUri().getScheme() != null){ + if (blobRoot.toUri().getScheme() != null) { fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString()); } @@ -100,93 +109,142 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { @Override public Iterator<Map.Entry<String, V>> getRange(int skip, int take) { - try{ - List<FileStatus> f = fs.list(false, basePath); - if (f == null || f.isEmpty()) { - return Collections.emptyIterator(); - } - List<String> files = Lists.newArrayList(); - - for (FileStatus stat : f) { - String s = stat.getPath().getName(); - if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); + try (AutoCloseableLock lock = readLock.open()) { + try { + List<FileStatus> f = fs.list(false, basePath); + if (f == null || f.isEmpty()) { + return Collections.emptyIterator(); } - } + List<String> files = Lists.newArrayList(); - Collections.sort(files); - return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { - @Nullable - @Override - public Entry<String, V> apply(String key) { - return new ImmutableEntry<>(key, get(key)); + for (FileStatus stat : f) { + String s = stat.getPath().getName(); + if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { + files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); + } } - }).iterator(); - }catch(IOException e){ - throw new RuntimeException(e); + + Collections.sort(files); + return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { + @Nullable + @Override + public Entry<String, V> apply(String key) { + return new ImmutableEntry<>(key, get(key)); + } + }).iterator(); + } catch (IOException e) { + throw new RuntimeException(e); + } } } private Path makePath(String name) { Preconditions.checkArgument( !name.contains("/") && - !name.contains(":") && - !name.contains("..")); + !name.contains(":") && + !name.contains("..")); + return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + } - final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); - // do this to check file name. - return path; + @Override + public boolean contains(String key) { + return contains(key, null); } @Override - public V get(String key) { - try{ - Path path = makePath(key); - if(!fs.exists(path)){ - return null; + public boolean contains(String key, DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = readLock.open()) { + try { + Path path = makePath(key); + boolean exists = fs.exists(path); + if (exists && dataChangeVersion != null) { + dataChangeVersion.setVersion(version); + } + return exists; + } catch (IOException e) { + throw new RuntimeException(e); } - }catch(IOException e){ - throw new RuntimeException(e); } + } - final Path path = makePath(key); - try (InputStream is = fs.open(path)) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); - } catch (IOException e) { - throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); + @Override + public V get(String key) { + return get(key, null); + } + + @Override + public V get(String key, DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = readLock.open()) { + try { + if (dataChangeVersion != null) { + dataChangeVersion.setVersion(version); + } + Path path = makePath(key); + if (!fs.exists(path)) { + return null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + final Path path = makePath(key); + try (InputStream is = fs.open(path)) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + } catch (IOException e) { + throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); + } } } @Override public void put(String key, V value) { - try (OutputStream os = fs.create(makePath(key))) { - IOUtils.write(config.getSerializer().serialize(value), os); - } catch (IOException e) { - throw new RuntimeException(e); + put(key, value, null); + } + + @Override + public void put(String key, V value, DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = writeLock.open()) { + if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) { + throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); + } + try (OutputStream os = fs.create(makePath(key))) { + IOUtils.write(config.getSerializer().serialize(value), os); + version++; + } catch (IOException e) { + throw new RuntimeException(e); + } } } @Override public boolean putIfAbsent(String key, V value) { - try { - Path p = makePath(key); - if (fs.exists(p)) { - return false; - } else { - put(key, value); - return true; + try (AutoCloseableLock lock = writeLock.open()) { + try { + Path p = makePath(key); + if (fs.exists(p)) { + return false; + } else { + try (OutputStream os = fs.create(makePath(key))) { + IOUtils.write(config.getSerializer().serialize(value), os); + version++; + } + return true; + } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); } } @Override public void delete(String key) { - try { - fs.delete(makePath(key), false); - } catch (IOException e) { - throw new RuntimeException(e); + try (AutoCloseableLock lock = writeLock.open()) { + try { + fs.delete(makePath(key), false); + version++; + } catch (IOException e) { + logger.error("Unable to delete data from storage.", e); + throw new RuntimeException(e); + } } } http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java index 55f72c9..a3ee58e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -62,16 +62,26 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(final String key) { + return contains(key, null); + } + + @Override + public boolean contains(final String key, final DataChangeVersion version) { + return client.hasPath(key, true, version); + } + + @Override public V get(final String key) { return get(key, false, null); } @Override - public V get(final String key, DataChangeVersion version) { + public V get(final String key, final DataChangeVersion version) { return get(key, true, version); } - public V get(final String key, boolean consistencyFlag, DataChangeVersion version) { + public V get(final String key, final boolean consistencyFlag, final DataChangeVersion version) { byte[] bytes = client.get(key, consistencyFlag, version); if (bytes == null) { @@ -90,7 +100,7 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { } @Override - public void put(final String key, final V value, DataChangeVersion version) { + public void put(final String key, final V value, final DataChangeVersion version) { final InstanceSerializer<V> serializer = config.getSerializer(); try { final byte[] bytes = serializer.serialize(value); http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java index 58ec3ea..e36dc83 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,6 @@ package org.apache.drill.exec.testing.store; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -38,13 +36,13 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); private final ConcurrentMap<String, V> store = Maps.newConcurrentMap(); - private final AtomicInteger version = new AtomicInteger(); + private int version = -1; @Override public void delete(final String key) { try (AutoCloseableLock lock = writeLock.open()) { store.remove(key); - version.incrementAndGet(); + version++; } } @@ -54,6 +52,21 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(final String key) { + return contains(key, null); + } + + @Override + public boolean contains(final String key, final DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = readLock.open()) { + if (dataChangeVersion != null) { + dataChangeVersion.setVersion(version); + } + return store.containsKey(key); + } + } + + @Override public V get(final String key) { return get(key, null); } @@ -62,7 +75,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { public V get(final String key, final DataChangeVersion dataChangeVersion) { try (AutoCloseableLock lock = readLock.open()) { if (dataChangeVersion != null) { - dataChangeVersion.setVersion(version.get()); + dataChangeVersion.setVersion(version); } return store.get(key); } @@ -76,11 +89,11 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { @Override public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) { try (AutoCloseableLock lock = writeLock.open()) { - if (dataChangeVersion != null && dataChangeVersion.getVersion() != version.get()) { + if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) { throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); } store.put(key, value); - version.incrementAndGet(); + version++; } } @@ -89,7 +102,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { try (AutoCloseableLock lock = writeLock.open()) { final V old = store.putIfAbsent(key, value); if (old == null) { - version.incrementAndGet(); + version++; return true; } return false; @@ -107,7 +120,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { public void close() throws Exception { try (AutoCloseableLock lock = writeLock.open()) { store.clear(); - version.set(0); + version = -1; } } }