DRILL-4963: Fix issues with dynamically loaded overloaded functions

close #701


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dcbcb94f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dcbcb94f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dcbcb94f

Branch: refs/heads/master
Commit: dcbcb94fd2695edd4bbca63b2759292e99695d47
Parents: 79811db
Author: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Authored: Tue Dec 20 16:57:15 2016 +0000
Committer: Jinfeng Ni <j...@apache.org>
Committed: Wed Mar 1 23:46:19 2017 -0800

----------------------------------------------------------------------
 .../hbase/config/HBasePersistentStore.java      |  16 +-
 .../drill/hbase/TestHBaseTableProvider.java     |   7 +-
 .../mongo/config/MongoPersistentStore.java      |  14 +-
 .../src/resources/drill-override-example.conf   |   3 +-
 .../drill/exec/coord/zk/ZookeeperClient.java    |  45 +++-
 .../exception/FunctionNotFoundException.java    |  27 ---
 .../expr/fn/FunctionImplementationRegistry.java | 219 ++++++++++++-------
 .../fn/registry/FunctionRegistryHolder.java     |  27 +--
 .../expr/fn/registry/LocalFunctionRegistry.java |  31 ++-
 .../fn/registry/RemoteFunctionRegistry.java     |  24 +-
 .../org/apache/drill/exec/ops/QueryContext.java |  11 +-
 .../exec/planner/sql/DrillOperatorTable.java    |  29 ++-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |  70 +++---
 .../drill/exec/planner/sql/SqlConverter.java    |   9 -
 .../sql/handlers/CreateFunctionHandler.java     |   3 +-
 .../sql/handlers/DropFunctionHandler.java       |   3 +-
 .../exec/store/sys/BasePersistentStore.java     |   8 +-
 .../drill/exec/store/sys/PersistentStore.java   |  20 +-
 .../store/sys/store/LocalPersistentStore.java   | 188 ++++++++++------
 .../sys/store/ZookeeperPersistentStore.java     |  18 +-
 .../exec/testing/store/NoWriteLocalStore.java   |  33 ++-
 .../org/apache/drill/TestDynamicUDFSupport.java | 101 ++++++---
 .../exec/coord/zk/TestZookeeperClient.java      |  30 ++-
 .../fn/registry/FunctionRegistryHolderTest.java |  45 ++--
 .../record/ExpressionTreeMaterializerTest.java  |   7 +-
 .../jars/DrillUDF-overloading-1.0-sources.jar   | Bin 0 -> 3473 bytes
 .../resources/jars/DrillUDF-overloading-1.0.jar | Bin 0 -> 5779 bytes
 27 files changed, 639 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
index 2d329a8..ef6bbfe 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -67,6 +67,20 @@ public class HBasePersistentStore<V> extends 
BasePersistentStore<V> {
   }
 
   @Override
+  public boolean contains(String key) {
+    try {
+      Get get = new Get(row(key));
+      get.addColumn(FAMILY, QUALIFIER);
+      return hbaseTable.exists(get);
+    } catch (IOException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Caught error while checking row existence '%s' for table 
'%s'", key, hbaseTableName)
+          .build(logger);
+    }
+  }
+
+  @Override
   public V get(String key) {
     return get(key, FAMILY);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
index 6b73283..f278359 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,8 @@
 package org.apache.drill.hbase;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Map.Entry;
 
@@ -57,6 +59,9 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
     assertEquals("v0", hbaseStore.get(""));
     assertEquals("testValue", hbaseStore.get(".test"));
 
+    assertTrue(hbaseStore.contains(""));
+    assertFalse(hbaseStore.contains("unknown_key"));
+
     int rowCount = 0;
     for (Entry<String, String> entry : 
Lists.newArrayList(hbaseStore.getAll())) {
       rowCount++;

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
index b5cc3ee..73ff31d 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -61,6 +61,18 @@ public class MongoPersistentStore<V> extends 
BasePersistentStore<V> {
   }
 
   @Override
+  public boolean contains(String key) {
+    try {
+      Bson query = Filters.eq(DrillMongoConstants.ID, key);
+      Document document = collection.find(query).first();
+      return document != null && document.containsKey(pKey);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  @Override
   public V get(String key) {
     try {
       Bson query = Filters.eq(DrillMongoConstants.ID, key);

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf 
b/distribution/src/resources/drill-override-example.conf
index 43f9942..b9d09a8 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -171,8 +171,7 @@ drill.exec: {
     decode_threadpool_size: 1
   },
   debug.error_on_leak: true,
-  # Settings for Dynamic UDFs.
-  # See 
https://gist.github.com/arina-ielchiieva/a1c4cfa3890145c5ecb1b70a39cbff55#file-dynamicudfssupport-md.
+  # Settings for Dynamic UDFs (see 
https://issues.apache.org/jira/browse/DRILL-4726 for details).
   udf: {
     # number of retry attempts to update remote function registry
     # if registry version was changed during update

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index 610a2b9..17cb6cb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -85,32 +85,53 @@ public class ZookeeperClient implements AutoCloseable {
 
   /**
    * Returns true if path exists in the cache, false otherwise.
-   *
    * Note that calls to this method are eventually consistent.
    *
-   * @param path  path to check
+   * @param path path to check
+   * @return true if path exists, false otherwise
    */
   public boolean hasPath(final String path) {
-    return hasPath(path, false);
+    return hasPath(path, false, null);
+  }
+
+  /**
+   * Returns true if path exists, false otherwise.
+   * If consistent flag is set to true, check is done directly is made against 
Zookeeper directly,
+   * else check is done against local cache.
+   *
+   * @param path path to check
+   * @param consistent whether the check should be consistent
+   * @return true if path exists, false otherwise
+   */
+  public boolean hasPath(final String path, final boolean consistent) {
+    return hasPath(path, consistent, null);
   }
 
   /**
    * Checks if the given path exists.
+   * If the flag consistent is set, the check is consistent as it is made 
against Zookeeper directly.
+   * Otherwise, the check is eventually consistent.
    *
-   * If the flag consistent is set, the check is consistent as it is made 
against Zookeeper directly. Otherwise,
-   * the check is eventually consistent.
+   * If consistency flag is set to true and version holder is not null, passes 
version holder to get data change version.
+   * Data change version is retrieved from {@link Stat} object, it increases 
each time znode data change is performed.
+   * Link to Zookeeper documentation - 
https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
    *
-   * @param path  path to check
-   * @param consistent  whether the check should be consistent
-   * @return
+   * @param path path to check
+   * @param consistent whether the check should be consistent
+   * @param version version holder
+   * @return true if path exists, false otherwise
    */
-  public boolean hasPath(final String path, final boolean consistent) {
+  public boolean hasPath(final String path, final boolean consistent, final 
DataChangeVersion version) {
     Preconditions.checkNotNull(path, "path is required");
 
     final String target = PathUtils.join(root, path);
     try {
       if (consistent) {
-        return curator.checkExists().forPath(target) != null;
+        Stat stat = curator.checkExists().forPath(target);
+        if (version != null && stat != null) {
+          version.setVersion(stat.getVersion());
+        }
+        return stat != null;
       } else {
         return getCache().getCurrentData(target) != null;
       }
@@ -153,7 +174,7 @@ public class ZookeeperClient implements AutoCloseable {
    * @param path  target path
    * @param version version holder
    */
-  public byte[] get(final String path, DataChangeVersion version) {
+  public byte[] get(final String path, final DataChangeVersion version) {
     return get(path, true, version);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
deleted file mode 100644
index 0d59cc8..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.exception;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-
-public class FunctionNotFoundException extends DrillRuntimeException {
-
-  public FunctionNotFoundException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index ce0d68b..5c7bfb4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -58,11 +58,13 @@ import 
org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.proto.UserBitShared.Jar;
 import org.apache.drill.exec.resolver.FunctionResolver;
+import org.apache.drill.exec.resolver.FunctionResolverFactory;
 import org.apache.drill.exec.server.options.OptionManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
 import org.apache.drill.exec.util.JarUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -128,7 +130,7 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
 
   /**
    * Register functions in given operator table.
-   * @param operatorTable
+   * @param operatorTable operator table
    */
   public void register(DrillOperatorTable operatorTable) {
     // Register Drill functions first and move to pluggable function 
registries.
@@ -140,27 +142,39 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
   }
 
   /**
-   * Using the given <code>functionResolver</code>
-   * finds Drill function implementation for given <code>functionCall</code>.
-   * If function implementation was not found,
-   * loads all missing remote functions and tries to find Drill implementation 
one more time.
+   * First attempts to finds the Drill function implementation that matches 
the name, arg types and return type.
+   * If exact function implementation was not found,
+   * syncs local function registry with remote function registry if needed
+   * and tries to find function implementation one more time
+   * but this time using given <code>functionResolver</code>.
+   *
+   * @param functionResolver function resolver
+   * @param functionCall function call
+   * @return best matching function holder
    */
   @Override
   public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, 
FunctionCall functionCall) {
-    return findDrillFunction(functionResolver, functionCall, true);
-  }
-
-  private DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, 
FunctionCall functionCall, boolean retry) {
     AtomicLong version = new AtomicLong();
-    DrillFuncHolder holder = functionResolver.getBestMatch(
-        localFunctionRegistry.getMethods(functionReplacement(functionCall), 
version), functionCall);
-    if (holder == null && retry && loadRemoteFunctions(version.get())) {
-      return findDrillFunction(functionResolver, functionCall, false);
+    String newFunctionName = functionReplacement(functionCall);
+    List<DrillFuncHolder> functions = 
localFunctionRegistry.getMethods(newFunctionName, version);
+    FunctionResolver exactResolver = 
FunctionResolverFactory.getExactResolver(functionCall);
+    DrillFuncHolder holder = exactResolver.getBestMatch(functions, 
functionCall);
+
+    if (holder == null) {
+      syncWithRemoteRegistry(version.get());
+      List<DrillFuncHolder> updatedFunctions = 
localFunctionRegistry.getMethods(newFunctionName, version);
+      holder = functionResolver.getBestMatch(updatedFunctions, functionCall);
     }
+
     return holder;
   }
 
-  // Check if this Function Replacement is needed; if yes, return a new name. 
otherwise, return the original name
+  /**
+   * Checks if this function replacement is needed.
+   *
+   * @param functionCall function call
+   * @return new function name is replacement took place, otherwise original 
function name
+   */
   private String functionReplacement(FunctionCall functionCall) {
     String funcName = functionCall.getName();
       if (functionCall.args.size() > 0) {
@@ -178,22 +192,41 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
   }
 
   /**
-   * Find the Drill function implementation that matches the name, arg types 
and return type.
-   * If exact function implementation was not found,
-   * loads all missing remote functions and tries to find Drill implementation 
one more time.
+   * Finds the Drill function implementation that matches the name, arg types 
and return type.
+   *
+   * @param name function name
+   * @param argTypes input parameters types
+   * @param returnType function return type
+   * @return exactly matching function holder
    */
   public DrillFuncHolder findExactMatchingDrillFunction(String name, 
List<MajorType> argTypes, MajorType returnType) {
     return findExactMatchingDrillFunction(name, argTypes, returnType, true);
   }
 
-  private DrillFuncHolder findExactMatchingDrillFunction(String name, 
List<MajorType> argTypes, MajorType returnType, boolean retry) {
+  /**
+   * Finds the Drill function implementation that matches the name, arg types 
and return type.
+   * If exact function implementation was not found,
+   * checks if local function registry is in sync with remote function 
registry.
+   * If not syncs them and tries to find exact function implementation one 
more time
+   * but with retry flag set to false.
+   *
+   * @param name function name
+   * @param argTypes input parameters types
+   * @param returnType function return type
+   * @param retry retry on failure flag
+   * @return exactly matching function holder
+   */
+  private DrillFuncHolder findExactMatchingDrillFunction(String name,
+                                                         List<MajorType> 
argTypes,
+                                                         MajorType returnType,
+                                                         boolean retry) {
     AtomicLong version = new AtomicLong();
     for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) {
       if (h.matches(returnType, argTypes)) {
         return h;
       }
     }
-    if (retry && loadRemoteFunctions(version.get())) {
+    if (retry && syncWithRemoteRegistry(version.get())) {
       return findExactMatchingDrillFunction(name, argTypes, returnType, false);
     }
     return null;
@@ -206,8 +239,8 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
    * Note: Order of searching is same as order of {@link 
org.apache.drill.exec.expr.fn.PluggableFunctionRegistry}
    * implementations found on classpath.
    *
-   * @param functionCall
-   * @return
+   * @param functionCall function call
+   * @return drill function holder
    */
   @Override
   public AbstractFuncHolder findNonDrillFunction(FunctionCall functionCall) {
@@ -260,76 +293,101 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
   }
 
   /**
-   * Attempts to load and register functions from remote function registry.
-   * First checks if there is no missing jars.
-   * If yes, enters synchronized block to prevent other loading the same jars.
-   * Again re-checks if there are no missing jars in case someone has already 
loaded them (double-check lock).
-   * If there are still missing jars, first copies jars to local udf area and 
prepares {@link JarScan} for each jar.
-   * Jar registration timestamp represented in milliseconds is used as suffix.
-   * Then registers all jars at the same time. Returns true when finished.
-   * In case if any errors during jars coping or registration, logs errors and 
proceeds.
+   * Purpose of this method is to synchronize remote and local function 
registries if needed
+   * and to inform if function registry was changed after given version.
+   *
+   * To make synchronization as much light-weigh as possible, first only 
versions of both registries are checked
+   * without any locking. If synchronization is needed, enters synchronized 
block to prevent others loading the same jars.
+   * The need of synchronization is checked again (double-check lock) before 
comparing jars.
+   * If any missing jars are found, they are downloaded to local udf area, 
each is wrapped into {@link JarScan}.
+   * Once jar download is finished, all missing jars are registered in one 
batch.
+   * In case if any errors during jars download / registration, these errors 
are logged.
    *
-   * If no missing jars are found, checks current local registry version.
-   * Returns false if versions match, true otherwise.
+   * During registration local function registry is updated with remote 
function registry version it is synced with.
+   * When at least one jar of the missing jars failed to download / register,
+   * local function registry version are not updated but jars that where 
successfully downloaded / registered
+   * are added to local function registry.
    *
-   * @param version local function registry version
-   * @return true if new jars were registered or local function registry 
version is different, false otherwise
+   * If synchronization between remote and local function registry was not 
needed,
+   * checks if given registry version matches latest sync version
+   * to inform if function registry was changed after given version.
+   *
+   * @param version remote function registry local function registry was based 
on
+   * @return true if remote and local function registries were synchronized 
after given version
    */
-  public boolean loadRemoteFunctions(long version) {
-    List<String> missingJars = getMissingJars(remoteFunctionRegistry, 
localFunctionRegistry);
-    if (!missingJars.isEmpty()) {
+  public boolean syncWithRemoteRegistry(long version) {
+    if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), 
localFunctionRegistry.getVersion())) {
       synchronized (this) {
-        missingJars = getMissingJars(remoteFunctionRegistry, 
localFunctionRegistry);
-        if (!missingJars.isEmpty()) {
-          logger.info("Starting dynamic UDFs lazy-init process.\n" +
-              "The following jars are going to be downloaded and registered 
locally: " + missingJars);
+        long localRegistryVersion = localFunctionRegistry.getVersion();
+        if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), 
localRegistryVersion))  {
+          DataChangeVersion remoteVersion = new DataChangeVersion();
+          List<String> missingJars = 
getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, 
remoteVersion);
           List<JarScan> jars = Lists.newArrayList();
-          for (String jarName : missingJars) {
-            Path binary = null;
-            Path source = null;
-            URLClassLoader classLoader = null;
-            try {
-              binary = copyJarToLocal(jarName, remoteFunctionRegistry);
-              source = copyJarToLocal(JarUtil.getSourceName(jarName), 
remoteFunctionRegistry);
-              URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
-              classLoader = new URLClassLoader(urls);
-              ScanResult scanResult = scan(classLoader, binary, urls);
-              localFunctionRegistry.validate(jarName, scanResult);
-              jars.add(new JarScan(jarName, scanResult, classLoader));
-            } catch (Exception e) {
-              deleteQuietlyLocalJar(binary);
-              deleteQuietlyLocalJar(source);
-              if (classLoader != null) {
-                try {
-                  classLoader.close();
-                } catch (Exception ex) {
-                  logger.warn("Problem during closing class loader for {}", 
jarName, e);
+          if (!missingJars.isEmpty()) {
+            logger.info("Starting dynamic UDFs lazy-init process.\n" +
+                "The following jars are going to be downloaded and registered 
locally: " + missingJars);
+            for (String jarName : missingJars) {
+              Path binary = null;
+              Path source = null;
+              URLClassLoader classLoader = null;
+              try {
+                binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
+                source = copyJarToLocal(JarUtil.getSourceName(jarName), 
this.remoteFunctionRegistry);
+                URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
+                classLoader = new URLClassLoader(urls);
+                ScanResult scanResult = scan(classLoader, binary, urls);
+                localFunctionRegistry.validate(jarName, scanResult);
+                jars.add(new JarScan(jarName, scanResult, classLoader));
+              } catch (Exception e) {
+                deleteQuietlyLocalJar(binary);
+                deleteQuietlyLocalJar(source);
+                if (classLoader != null) {
+                  try {
+                    classLoader.close();
+                  } catch (Exception ex) {
+                    logger.warn("Problem during closing class loader for {}", 
jarName, e);
+                  }
                 }
+                logger.error("Problem during remote functions load from {}", 
jarName, e);
               }
-              logger.error("Problem during remote functions load from {}", 
jarName, e);
             }
           }
-          if (!jars.isEmpty()) {
-            localFunctionRegistry.register(jars);
-            return true;
-          }
+          long latestRegistryVersion = jars.size() != missingJars.size() ?
+              localRegistryVersion : remoteVersion.getVersion();
+          localFunctionRegistry.register(jars, latestRegistryVersion);
+          return true;
         }
       }
     }
+
     return version != localFunctionRegistry.getVersion();
   }
 
   /**
-   * First finds path to marker file url, otherwise throws {@link 
JarValidationException}.
-   * Then scans jar classes according to list indicated in marker files.
-   * Additional logic is added to close {@link URL} after {@link 
ConfigFactory#parseURL(URL)}.
-   * This is extremely important for Windows users where system doesn't allow 
to delete file if it's being used.
+   * Checks if local function registry should be synchronized with remote 
function registry.
+   * If remote function registry version is -1, it means that remote function 
registry is unreachable
+   * or is not configured thus we skip synchronization and return false.
+   * In all other cases synchronization is needed if remote and local function 
registries versions do not match.
    *
-   * @param classLoader unique class loader for jar
-   * @param path local path to jar
-   * @param urls urls associated with the jar (ex: binary and source)
-   * @return scan result of packages, classes, annotations found in jar
+   * @param remoteVersion remote function registry version
+   * @param localVersion local function registry version
+   * @return true is local registry should be refreshed, false otherwise
    */
+  private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) {
+    return remoteVersion != -1 && remoteVersion != localVersion;
+  }
+
+  /**
+  * First finds path to marker file url, otherwise throws {@link 
JarValidationException}.
+  * Then scans jar classes according to list indicated in marker files.
+  * Additional logic is added to close {@link URL} after {@link 
ConfigFactory#parseURL(URL)}.
+  * This is extremely important for Windows users where system doesn't allow 
to delete file if it's being used.
+  *
+  * @param classLoader unique class loader for jar
+  * @param path local path to jar
+  * @param urls urls associated with the jar (ex: binary and source)
+  * @return scan result of packages, classes, annotations found in jar
+  */
   private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) 
throws IOException {
     Enumeration<URL> markerFileEnumeration = classLoader.getResources(
         CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
@@ -355,14 +413,17 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
   /**
    * Return list of jars that are missing in local function registry
    * but present in remote function registry.
+   * Also updates version holder with remote function registry version.
    *
    * @param remoteFunctionRegistry remote function registry
    * @param localFunctionRegistry local function registry
+   * @param version holder for remote function registry version
    * @return list of missing jars
    */
   private List<String> getMissingJars(RemoteFunctionRegistry 
remoteFunctionRegistry,
-                                      LocalFunctionRegistry 
localFunctionRegistry) {
-    List<Jar> remoteJars = remoteFunctionRegistry.getRegistry().getJarList();
+                                      LocalFunctionRegistry 
localFunctionRegistry,
+                                      DataChangeVersion version) {
+    List<Jar> remoteJars = 
remoteFunctionRegistry.getRegistry(version).getJarList();
     List<String> localJars = localFunctionRegistry.getAllJarNames();
     List<String> missingJars = Lists.newArrayList();
     for (Jar jar : remoteJars) {
@@ -384,8 +445,10 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
   private Path getLocalUdfDir(DrillConfig config) {
     tmpDir = getTmpDir(config);
     File udfDir = new File(tmpDir, 
config.getString(ExecConstants.UDF_DIRECTORY_LOCAL));
-    udfDir.mkdirs();
     String udfPath = udfDir.getPath();
+    if (udfDir.mkdirs()) {
+      logger.debug("Local udf directory [{}] was created", udfPath);
+    }
     Preconditions.checkState(udfDir.exists(), "Local udf directory [%s] must 
exist", udfPath);
     Preconditions.checkState(udfDir.isDirectory(), "Local udf directory [%s] 
must be a directory", udfPath);
     Preconditions.checkState(udfDir.canWrite(), "Local udf directory [%s] must 
be writable for application user", udfPath);
@@ -404,6 +467,8 @@ public class FunctionImplementationRegistry implements 
FunctionLookupContext, Au
    * If value is still missing, generates directory using {@link 
Files#createTempDir()}.
    * If temporary directory was generated, sets {@link #deleteTmpDir} to true
    * to delete directory on drillbit exit.
+   *
+   * @param config drill config
    * @return drill temporary directory path
    */
   private File getTmpDir(DrillConfig config) {

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index 005c4e5..3124539 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -42,8 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * since we expect infrequent registry changes.
  * Holder is designed to allow concurrent reads and single writes to keep data 
consistent.
  * This is achieved by {@link ReadWriteLock} implementation usage.
- * Holder has number version which changes every time new jars are added or 
removed. Initial version number is 0.
- * Also version is used when user needs data from registry with version it is 
based on.
+ * Holder has number version which indicates remote function registry version 
number it is in sync with.
  *
  * Structure example:
  *
@@ -86,7 +85,8 @@ public class FunctionRegistryHolder {
   private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   private final AutoCloseableLock readLock = new 
AutoCloseableLock(readWriteLock.readLock());
   private final AutoCloseableLock writeLock = new 
AutoCloseableLock(readWriteLock.writeLock());
-  private long version = 0;
+  // remote function registry number, it is in sync with
+  private long version;
 
   // jar name, Map<function name, Queue<function signature>
   private final Map<String, Map<String, Queue<String>>> jars;
@@ -114,13 +114,13 @@ public class FunctionRegistryHolder {
    * If jar with the same name already exists, it and its functions will be 
removed.
    * Then jar will be added to {@link #jars}
    * and each function will be added using {@link #addFunctions(Map, List)}.
-   * Function version registry will be incremented by 1 if at least one jar 
was added but not for each jar.
+   * Registry version is updated with passed version if all jars were added 
successfully.
    * This is write operation, so one user at a time can call perform such 
action,
    * others will wait till first user completes his action.
    *
    * @param newJars jars and list of their function holders, each contains 
function name, signature and holder
    */
-  public void addJars(Map<String, List<FunctionHolder>> newJars) {
+  public void addJars(Map<String, List<FunctionHolder>> newJars, long version) 
{
     try (AutoCloseableLock lock = writeLock.open()) {
       for (Map.Entry<String, List<FunctionHolder>> newJar : 
newJars.entrySet()) {
         String jarName = newJar.getKey();
@@ -129,15 +129,12 @@ public class FunctionRegistryHolder {
         jars.put(jarName, jar);
         addFunctions(jar, newJar.getValue());
       }
-      if (!newJars.isEmpty()) {
-        version++;
-      }
+      this.version = version;
     }
   }
 
   /**
    * Removes jar from {@link #jars} and all associated with jar functions from 
{@link #functions}
-   * If jar was removed, function registry version will be incremented by 1.
    * This is write operation, so one user at a time can call perform such 
action,
    * others will wait till first user completes his action.
    *
@@ -145,9 +142,7 @@ public class FunctionRegistryHolder {
    */
   public void removeJar(String jarName) {
     try (AutoCloseableLock lock = writeLock.open()) {
-      if (removeAllByJar(jarName)) {
-        version++;
-      }
+      removeAllByJar(jarName);
     }
   }
 
@@ -341,12 +336,11 @@ public class FunctionRegistryHolder {
    * All jar functions have the same class loader, so we need to close only 
one time.
    *
    * @param jarName jar name to be removed
-   * @return true if jar was removed, false otherwise
    */
-  private boolean removeAllByJar(String jarName) {
+  private void removeAllByJar(String jarName) {
     Map<String, Queue<String>> jar = jars.remove(jarName);
     if (jar == null) {
-      return false;
+      return;
     }
 
     for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) {
@@ -372,6 +366,5 @@ public class FunctionRegistryHolder {
         functions.remove(function);
       }
     }
-    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index 2a3f167..1318f72 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -73,12 +73,14 @@ public class LocalFunctionRegistry {
 
   private final FunctionRegistryHolder registryHolder;
 
-  /** Registers all functions present in Drill classpath on start-up. All 
functions will be marked as built-in.
-   * Built-in functions are not allowed to be unregistered. */
+  /**
+   * Registers all functions present in Drill classpath on start-up. All 
functions will be marked as built-in.
+   * Built-in functions are not allowed to be unregistered. Initially sync 
registry version will be set to 0.
+   */
   public LocalFunctionRegistry(ScanResult classpathScan) {
     registryHolder = new FunctionRegistryHolder();
     validate(BUILT_IN, classpathScan);
-    register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, 
this.getClass().getClassLoader())));
+    register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, 
this.getClass().getClassLoader())), 0);
     if (logger.isTraceEnabled()) {
       StringBuilder allFunctions = new StringBuilder();
       for (DrillFuncHolder method: 
registryHolder.getAllFunctionsWithHolders().values()) {
@@ -89,7 +91,7 @@ public class LocalFunctionRegistry {
   }
 
   /**
-   * @return local function registry version number
+   * @return remote function registry version number with which local function 
registry is synced
    */
   public long getVersion() {
     return registryHolder.getVersion();
@@ -147,14 +149,15 @@ public class LocalFunctionRegistry {
   }
 
   /**
-   * Registers all functions present in jar.
+   * Registers all functions present in jar and updates registry version.
    * If jar name is already registered, all jar related functions will be 
overridden.
    * To prevent classpath collisions during loading and unloading jars,
    * each jar is shipped with its own class loader.
    *
    * @param jars list of jars to be registered
+   * @param version remote function registry version number with which local 
function registry is synced
    */
-  public void register(List<JarScan> jars) {
+  public void register(List<JarScan> jars, long version) {
     Map<String, List<FunctionHolder>> newJars = Maps.newHashMap();
     for (JarScan jarScan : jars) {
       FunctionConverter converter = new FunctionConverter();
@@ -174,7 +177,7 @@ public class LocalFunctionRegistry {
         }
       }
     }
-    registryHolder.addJars(newJars);
+    registryHolder.addJars(newJars, version);
   }
 
   /**
@@ -217,25 +220,31 @@ public class LocalFunctionRegistry {
     return registryHolder.getHoldersByFunctionName(name.toLowerCase(), 
version);
   }
 
+  /**
+   * @param name function name
+   * @return all function holders associated with the function name. Function 
name is case insensitive.
+   */
   public List<DrillFuncHolder> getMethods(String name) {
     return registryHolder.getHoldersByFunctionName(name.toLowerCase());
   }
 
   /**
    * Registers all functions present in {@link DrillOperatorTable},
-   * also sets local registry version used at the moment of registering.
+   * also sets sync registry version used at the moment of function 
registration.
    *
    * @param operatorTable drill operator table
    */
   public void register(DrillOperatorTable operatorTable) {
     AtomicLong versionHolder = new AtomicLong();
-    final Map<String, Collection<DrillFuncHolder>> registeredFunctions = 
registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
+    final Map<String, Collection<DrillFuncHolder>> registeredFunctions =
+        registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
     operatorTable.setFunctionRegistryVersion(versionHolder.get());
     registerOperatorsWithInference(operatorTable, registeredFunctions);
     registerOperatorsWithoutInference(operatorTable, registeredFunctions);
   }
 
-  private void registerOperatorsWithInference(DrillOperatorTable 
operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) {
+  private void registerOperatorsWithInference(DrillOperatorTable 
operatorTable, Map<String,
+      Collection<DrillFuncHolder>> registeredFunctions) {
     final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = 
Maps.newHashMap();
     final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = 
Maps.newHashMap();
     for (Entry<String, Collection<DrillFuncHolder>> function : 
registeredFunctions.entrySet()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index fe79583..2e5eda2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -106,8 +106,26 @@ public class RemoteFunctionRegistry implements 
AutoCloseable {
     this.retryAttempts = config.getInt(ExecConstants.UDF_RETRY_ATTEMPTS);
   }
 
-  public Registry getRegistry() {
-    return registry.get(registry_path, null);
+  /**
+   * Returns current remote function registry version.
+   * If remote function registry is not found or unreachable, logs error and 
returns -1.
+   *
+   * @return remote function registry version if any, -1 otherwise
+   */
+  public long getRegistryVersion() {
+    DataChangeVersion version = new DataChangeVersion();
+    boolean contains = false;
+    try {
+      contains = registry.contains(registry_path, version);
+    } catch (Exception e) {
+      logger.error("Problem during trying to access remote function registry 
[{}]", registry_path, e);
+    }
+    if (contains) {
+      return version.getVersion();
+    } else {
+      logger.error("Remote function registry [{}] is unreachable", 
registry_path);
+      return -1;
+    }
   }
 
   public Registry getRegistry(DataChangeVersion version) {

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 264af29..707815a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -66,7 +66,6 @@ public class QueryContext implements AutoCloseable, 
OptimizerRulesContext, Schem
   private final UserSession session;
   private final OptionManager queryOptions;
   private final PlannerSettings plannerSettings;
-  private final DrillOperatorTable table;
   private final ExecutionControls executionControls;
 
   private final BufferAllocator allocator;
@@ -83,6 +82,7 @@ public class QueryContext implements AutoCloseable, 
OptimizerRulesContext, Schem
    * time this is set to true and the close method becomes a no-op.
    */
   private boolean closed = false;
+  private DrillOperatorTable table;
 
   public QueryContext(final UserSession session, final DrillbitContext 
drillbitContext, QueryId queryId) {
     this.drillbitContext = drillbitContext;
@@ -229,6 +229,15 @@ public class QueryContext implements AutoCloseable, 
OptimizerRulesContext, Schem
     return table;
   }
 
+  /**
+   * Re-creates drill operator table to refresh functions list from local 
function registry.
+   */
+  public void reloadDrillOperatorTable() {
+    table = new DrillOperatorTable(
+        drillbitContext.getFunctionImplementationRegistry(),
+        drillbitContext.getOptionManager());
+  }
+
   public QueryContextInformation getQueryContextInfo() {
     return queryContextInfo;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index 6e5c72b..5102ae8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -36,7 +36,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
  * Implementation of {@link SqlOperatorTable} that contains standard operators 
and functions provided through
@@ -52,8 +52,8 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
 
   private final ArrayListMultimap<String, SqlOperator> 
drillOperatorsWithoutInferenceMap = ArrayListMultimap.create();
   private final ArrayListMultimap<String, SqlOperator> 
drillOperatorsWithInferenceMap = ArrayListMultimap.create();
-  // indicates local function registry version based on which drill operator 
were loaded
-  // is used to define if we need to reload operator table in case when 
function signature was not found
+  // indicates remote function registry version based on which drill operator 
were loaded
+  // is used to define if we need to reload operator table in case remote 
function registry version has changed
   private long functionRegistryVersion;
 
   private final OptionManager systemOptionManager;
@@ -65,19 +65,18 @@ public class DrillOperatorTable extends SqlStdOperatorTable 
{
     this.systemOptionManager = systemOptionManager;
   }
 
-  /** Cleans up all operator holders and reloads operators */
-  public void reloadOperators(FunctionImplementationRegistry registry) {
-    drillOperatorsWithoutInference.clear();
-    drillOperatorsWithInference.clear();
-    drillOperatorsWithoutInferenceMap.clear();
-    drillOperatorsWithInferenceMap.clear();
-    registry.register(this);
-  }
-
-  public long setFunctionRegistryVersion(long version) {
-    return functionRegistryVersion = version;
+  /**
+   * Set function registry version based on which operator table was loaded.
+   *
+   * @param version registry version
+   */
+  public void setFunctionRegistryVersion(long version) {
+    functionRegistryVersion = version;
   }
 
+  /**
+   * @return function registry version based on which operator table was loaded
+   */
   public long getFunctionRegistryVersion() {
     return functionRegistryVersion;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 0ad3944..3bc0922 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -24,10 +24,7 @@ import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.FunctionNotFoundException;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
@@ -50,13 +47,56 @@ public class DrillSqlWorker {
   private DrillSqlWorker() {
   }
 
+  /**
+   * Converts sql query string into query physical plan.
+   *
+   * @param context query context
+   * @param sql sql query
+   * @return query physical plan
+   */
   public static PhysicalPlan getPlan(QueryContext context, String sql) throws 
SqlParseException, ValidationException,
       ForemanSetupException {
     return getPlan(context, sql, null);
   }
 
+  /**
+   * Converts sql query string into query physical plan.
+   * In case of any errors (that might occur due to missing function 
implementation),
+   * checks if local function registry should be synchronized with remote 
function registry.
+   * If sync took place, reloads drill operator table
+   * (since functions were added to / removed from local function registry)
+   * and attempts to converts sql query string into query physical plan one 
more time.
+   *
+   * @param context query context
+   * @param sql sql query
+   * @param textPlan text plan
+   * @return query physical plan
+   */
   public static PhysicalPlan getPlan(QueryContext context, String sql, 
Pointer<String> textPlan)
       throws ForemanSetupException {
+    Pointer<String> textPlanCopy = textPlan == null ? null : new 
Pointer<>(textPlan.value);
+    try {
+      return getQueryPlan(context, sql, textPlan);
+    } catch (Exception e) {
+      if (context.getFunctionRegistry().syncWithRemoteRegistry(
+          context.getDrillOperatorTable().getFunctionRegistryVersion())) {
+        context.reloadDrillOperatorTable();
+        return getQueryPlan(context, sql, textPlanCopy);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Converts sql query string into query physical plan.
+   *
+   * @param context query context
+   * @param sql sql query
+   * @param textPlan text plan
+   * @return query physical plan
+   */
+  private static PhysicalPlan getQueryPlan(QueryContext context, String sql, 
Pointer<String> textPlan)
+      throws ForemanSetupException {
 
     final SqlConverter parser = new SqlConverter(context);
 
@@ -88,7 +128,7 @@ public class DrillSqlWorker {
     }
 
     try {
-      return getPhysicalPlan(handler, sqlNode, context);
+      return handler.getPlan(sqlNode);
     } catch(ValidationException e) {
       String errorMessage = e.getCause() != null ? e.getCause().getMessage() : 
e.getMessage();
       throw UserException.validationError(e)
@@ -104,26 +144,4 @@ public class DrillSqlWorker {
       throw new QueryInputException("Failure handling SQL.", e);
     }
   }
-
-  /**
-   * Returns query physical plan.
-   * In case of {@link FunctionNotFoundException} attempts to load remote 
functions.
-   * If at least one function was loaded or local function function registry 
version has changed,
-   * makes one more attempt to get query physical plan.
-   */
-  private static PhysicalPlan getPhysicalPlan(AbstractSqlHandler handler, 
SqlNode sqlNode, QueryContext context)
-      throws RelConversionException, IOException, ForemanSetupException, 
ValidationException {
-    try {
-      return handler.getPlan(sqlNode);
-    } catch (FunctionNotFoundException e) {
-      DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
-      FunctionImplementationRegistry functionRegistry = 
context.getFunctionRegistry();
-      if 
(functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion()))
 {
-        drillOperatorTable.reloadOperators(functionRegistry);
-        return handler.getPlan(sqlNode);
-      }
-      throw e;
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index e9085f7..845848c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -49,17 +49,13 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
-import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
 import org.apache.calcite.sql2rel.RelDecorrelator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.FunctionNotFoundException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.ops.UdfUtilities;
@@ -173,11 +169,6 @@ public class SqlConverter {
       SqlNode validatedNode = validator.validate(parsedNode);
       return validatedNode;
     } catch (RuntimeException e) {
-      final Throwable rootCause = ExceptionUtils.getRootCause(e);
-      if (rootCause instanceof SqlValidatorException
-          && StringUtils.contains(rootCause.getMessage(), "No match found for 
function signature")) {
-        throw new FunctionNotFoundException(rootCause.getMessage(), e);
-      }
       UserException.Builder builder = UserException
           .validationError(e)
           .addContext("SQL Query", sql);

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
index 48bfd8b..0902fb7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -186,6 +186,7 @@ public class CreateFunctionHandler extends 
DefaultSqlHandler {
           remoteRegistry.updateRegistry(updatedRegistry, version);
           return;
         } catch (VersionMismatchException ex) {
+          logger.debug("Failed to update function registry during 
registration, version mismatch was detected.", ex);
           retryAttempts--;
         }
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
index 6e2801a..b5d0b23 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -142,6 +142,7 @@ public class DropFunctionHandler extends DefaultSqlHandler {
         remoteFunctionRegistry.updateRegistry(updatedRegistry, version);
         return jarToBeDeleted;
       } catch (VersionMismatchException ex) {
+        logger.debug("Failed to update function registry during 
unregistration, version mismatch was detected.", ex);
         retryAttempts--;
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
index ea38278..0640407 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,6 +29,12 @@ public abstract class BasePersistentStore<V> implements 
PersistentStore<V> {
     return getRange(0, Integer.MAX_VALUE);
   }
 
+  /** By default contains with version will behave the same way as without 
version.
+   * Override this method to add version support. */
+  public boolean contains(String key, DataChangeVersion version) {
+    return contains(key);
+  }
+
   /** By default get with version will behave the same way as without version.
    * Override this method to add version support. */
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
index bb23752..206642a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,6 +34,24 @@ public interface PersistentStore<V> extends AutoCloseable {
   PersistentStoreMode getMode();
 
   /**
+   * Checks if lookup key is present in store.
+   *
+   * @param key lookup key
+   * @return true if store contains lookup key, false otherwise
+   */
+  boolean contains(String key);
+
+  /**
+   * Checks if lookup key is present in store.
+   * Sets data change version number.
+   *
+   * @param key lookup key
+   * @param version version holder
+   * @return true if store contains lookup key, false otherwise
+   */
+  boolean contains(String key, DataChangeVersion version);
+
+  /**
    * Returns the value for the given key if exists, null otherwise.
    * @param key  lookup key
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
index b9a4b59..ef855e2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,12 +28,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.Nullable;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.exception.VersionMismatchException;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.sys.BasePersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
@@ -47,13 +51,20 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LocalPersistentStore<V> extends BasePersistentStore<V> {
-//  private static final Logger logger = 
LoggerFactory.getLogger(LocalPersistentStore.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalPersistentStore.class);
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock readLock = new 
AutoCloseableLock(readWriteLock.readLock());
+  private final AutoCloseableLock writeLock = new 
AutoCloseableLock(readWriteLock.writeLock());
 
   private final Path basePath;
   private final PersistentStoreConfig<V> config;
   private final DrillFileSystem fs;
+  private int version = -1;
 
   public LocalPersistentStore(DrillFileSystem fs, Path base, 
PersistentStoreConfig<V> config) {
     super();
@@ -62,7 +73,9 @@ public class LocalPersistentStore<V> extends 
BasePersistentStore<V> {
     this.fs = fs;
 
     try {
-      mkdirs(basePath);
+      if (!fs.mkdirs(basePath)) {
+        version++;
+      }
     } catch (IOException e) {
       throw new RuntimeException("Failure setting pstore configuration path.");
     }
@@ -73,11 +86,7 @@ public class LocalPersistentStore<V> extends 
BasePersistentStore<V> {
     return PersistentStoreMode.PERSISTENT;
   }
 
-  private void mkdirs(Path path) throws IOException{
-    fs.mkdirs(path);
-  }
-
-  public static Path getLogDir(){
+  public static Path getLogDir() {
     String drillLogDir = System.getenv("DRILL_LOG_DIR");
     if (drillLogDir == null) {
       drillLogDir = "/var/log/drill";
@@ -85,10 +94,10 @@ public class LocalPersistentStore<V> extends 
BasePersistentStore<V> {
     return new Path(new File(drillLogDir).getAbsoluteFile().toURI());
   }
 
-  public static DrillFileSystem getFileSystem(DrillConfig config, Path root) 
throws IOException{
+  public static DrillFileSystem getFileSystem(DrillConfig config, Path root) 
throws IOException {
     Path blobRoot = root == null ? getLogDir() : root;
     Configuration fsConf = new Configuration();
-    if(blobRoot.toUri().getScheme() != null){
+    if (blobRoot.toUri().getScheme() != null) {
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
     }
 
@@ -100,93 +109,142 @@ public class LocalPersistentStore<V> extends 
BasePersistentStore<V> {
 
   @Override
   public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
-    try{
-      List<FileStatus> f = fs.list(false, basePath);
-      if (f == null || f.isEmpty()) {
-        return Collections.emptyIterator();
-      }
-      List<String> files = Lists.newArrayList();
-
-      for (FileStatus stat : f) {
-        String s = stat.getPath().getName();
-        if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
-          files.add(s.substring(0, s.length() - 
DRILL_SYS_FILE_SUFFIX.length()));
+    try (AutoCloseableLock lock = readLock.open()) {
+      try {
+        List<FileStatus> f = fs.list(false, basePath);
+        if (f == null || f.isEmpty()) {
+          return Collections.emptyIterator();
         }
-      }
+        List<String> files = Lists.newArrayList();
 
-      Collections.sort(files);
-      return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), 
take), new Function<String, Entry<String, V>>() {
-        @Nullable
-        @Override
-        public Entry<String, V> apply(String key) {
-          return new ImmutableEntry<>(key, get(key));
+        for (FileStatus stat : f) {
+          String s = stat.getPath().getName();
+          if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
+            files.add(s.substring(0, s.length() - 
DRILL_SYS_FILE_SUFFIX.length()));
+          }
         }
-      }).iterator();
-    }catch(IOException e){
-      throw new RuntimeException(e);
+
+        Collections.sort(files);
+        return Iterables.transform(Iterables.limit(Iterables.skip(files, 
skip), take), new Function<String, Entry<String, V>>() {
+          @Nullable
+          @Override
+          public Entry<String, V> apply(String key) {
+            return new ImmutableEntry<>(key, get(key));
+          }
+        }).iterator();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
   private Path makePath(String name) {
     Preconditions.checkArgument(
         !name.contains("/") &&
-        !name.contains(":") &&
-        !name.contains(".."));
+            !name.contains(":") &&
+            !name.contains(".."));
+    return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
+  }
 
-    final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
-    // do this to check file name.
-    return path;
+  @Override
+  public boolean contains(String key) {
+    return contains(key, null);
   }
 
   @Override
-  public V get(String key) {
-    try{
-      Path path = makePath(key);
-      if(!fs.exists(path)){
-        return null;
+  public boolean contains(String key, DataChangeVersion dataChangeVersion) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      try {
+        Path path = makePath(key);
+        boolean exists = fs.exists(path);
+        if (exists && dataChangeVersion != null) {
+          dataChangeVersion.setVersion(version);
+        }
+        return exists;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-    }catch(IOException e){
-      throw new RuntimeException(e);
     }
+  }
 
-    final Path path = makePath(key);
-    try (InputStream is = fs.open(path)) {
-      return config.getSerializer().deserialize(IOUtils.toByteArray(is));
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
+  @Override
+  public V get(String key) {
+    return get(key, null);
+  }
+
+  @Override
+  public V get(String key, DataChangeVersion dataChangeVersion) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      try {
+        if (dataChangeVersion != null) {
+          dataChangeVersion.setVersion(version);
+        }
+        Path path = makePath(key);
+        if (!fs.exists(path)) {
+          return null;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      final Path path = makePath(key);
+      try (InputStream is = fs.open(path)) {
+        return config.getSerializer().deserialize(IOUtils.toByteArray(is));
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to deserialize \"" + path + "\"", 
e);
+      }
     }
   }
 
   @Override
   public void put(String key, V value) {
-    try (OutputStream os = fs.create(makePath(key))) {
-      IOUtils.write(config.getSerializer().serialize(value), os);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    put(key, value, null);
+  }
+
+  @Override
+  public void put(String key, V value, DataChangeVersion dataChangeVersion) {
+    try (AutoCloseableLock lock = writeLock.open()) {
+      if (dataChangeVersion != null && dataChangeVersion.getVersion() != 
version) {
+        throw new VersionMismatchException("Version mismatch detected", 
dataChangeVersion.getVersion());
+      }
+      try (OutputStream os = fs.create(makePath(key))) {
+        IOUtils.write(config.getSerializer().serialize(value), os);
+        version++;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
   @Override
   public boolean putIfAbsent(String key, V value) {
-    try {
-      Path p = makePath(key);
-      if (fs.exists(p)) {
-        return false;
-      } else {
-        put(key, value);
-        return true;
+    try (AutoCloseableLock lock = writeLock.open()) {
+      try {
+        Path p = makePath(key);
+        if (fs.exists(p)) {
+          return false;
+        } else {
+          try (OutputStream os = fs.create(makePath(key))) {
+            IOUtils.write(config.getSerializer().serialize(value), os);
+            version++;
+          }
+          return true;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
     }
   }
 
   @Override
   public void delete(String key) {
-    try {
-      fs.delete(makePath(key), false);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    try (AutoCloseableLock lock = writeLock.open()) {
+      try {
+        fs.delete(makePath(key), false);
+        version++;
+      } catch (IOException e) {
+        logger.error("Unable to delete data from storage.", e);
+        throw new RuntimeException(e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
index 55f72c9..a3ee58e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -62,16 +62,26 @@ public class ZookeeperPersistentStore<V> extends 
BasePersistentStore<V> {
   }
 
   @Override
+  public boolean contains(final String key) {
+    return contains(key, null);
+  }
+
+  @Override
+  public boolean contains(final String key, final DataChangeVersion version) {
+    return client.hasPath(key, true, version);
+  }
+
+  @Override
   public V get(final String key) {
     return get(key, false, null);
   }
 
   @Override
-  public V get(final String key, DataChangeVersion version) {
+  public V get(final String key, final DataChangeVersion version) {
     return get(key, true, version);
   }
 
-  public V get(final String key, boolean consistencyFlag, DataChangeVersion 
version) {
+  public V get(final String key, final boolean consistencyFlag, final 
DataChangeVersion version) {
     byte[] bytes = client.get(key, consistencyFlag, version);
 
     if (bytes == null) {
@@ -90,7 +100,7 @@ public class ZookeeperPersistentStore<V> extends 
BasePersistentStore<V> {
   }
 
   @Override
-  public void put(final String key, final V value, DataChangeVersion version) {
+  public void put(final String key, final V value, final DataChangeVersion 
version) {
     final InstanceSerializer<V> serializer = config.getSerializer();
     try {
       final byte[] bytes = serializer.serialize(value);

http://git-wip-us.apache.org/repos/asf/drill/blob/dcbcb94f/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
index 58ec3ea..e36dc83 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.drill.exec.testing.store;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -38,13 +36,13 @@ public class NoWriteLocalStore<V> extends 
BasePersistentStore<V> {
   private final AutoCloseableLock readLock = new 
AutoCloseableLock(readWriteLock.readLock());
   private final AutoCloseableLock writeLock = new 
AutoCloseableLock(readWriteLock.writeLock());
   private final ConcurrentMap<String, V> store = Maps.newConcurrentMap();
-  private final AtomicInteger version = new AtomicInteger();
+  private int version = -1;
 
   @Override
   public void delete(final String key) {
     try (AutoCloseableLock lock = writeLock.open()) {
       store.remove(key);
-      version.incrementAndGet();
+      version++;
     }
   }
 
@@ -54,6 +52,21 @@ public class NoWriteLocalStore<V> extends 
BasePersistentStore<V> {
   }
 
   @Override
+  public boolean contains(final String key) {
+    return contains(key, null);
+  }
+
+  @Override
+  public boolean contains(final String key, final DataChangeVersion 
dataChangeVersion) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      if (dataChangeVersion != null) {
+        dataChangeVersion.setVersion(version);
+      }
+      return store.containsKey(key);
+    }
+  }
+
+  @Override
   public V get(final String key) {
     return get(key, null);
   }
@@ -62,7 +75,7 @@ public class NoWriteLocalStore<V> extends 
BasePersistentStore<V> {
   public V get(final String key, final DataChangeVersion dataChangeVersion) {
     try (AutoCloseableLock lock = readLock.open()) {
       if (dataChangeVersion != null) {
-        dataChangeVersion.setVersion(version.get());
+        dataChangeVersion.setVersion(version);
       }
       return store.get(key);
     }
@@ -76,11 +89,11 @@ public class NoWriteLocalStore<V> extends 
BasePersistentStore<V> {
   @Override
   public void put(final String key, final V value, final DataChangeVersion 
dataChangeVersion) {
     try (AutoCloseableLock lock = writeLock.open()) {
-      if (dataChangeVersion != null && dataChangeVersion.getVersion() != 
version.get()) {
+      if (dataChangeVersion != null && dataChangeVersion.getVersion() != 
version) {
         throw new VersionMismatchException("Version mismatch detected", 
dataChangeVersion.getVersion());
       }
       store.put(key, value);
-      version.incrementAndGet();
+      version++;
     }
   }
 
@@ -89,7 +102,7 @@ public class NoWriteLocalStore<V> extends 
BasePersistentStore<V> {
     try (AutoCloseableLock lock = writeLock.open()) {
       final V old = store.putIfAbsent(key, value);
       if (old == null) {
-        version.incrementAndGet();
+        version++;
         return true;
       }
       return false;
@@ -107,7 +120,7 @@ public class NoWriteLocalStore<V> extends 
BasePersistentStore<V> {
   public void close() throws Exception {
     try (AutoCloseableLock lock = writeLock.open()) {
       store.clear();
-      version.set(0);
+      version = -1;
     }
   }
 }

Reply via email to