DRILL-6053: Avoid excessive locking in LocalPersistentStore closes #1163
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/590a72bc Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/590a72bc Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/590a72bc Branch: refs/heads/master Commit: 590a72bc667f6bc373130bcae58c22c11f13edaf Parents: 9327ca6 Author: Vlad Rozov <[email protected]> Authored: Tue Mar 13 10:56:52 2018 -0700 Committer: Vitalii Diravka <[email protected]> Committed: Sat Mar 24 20:35:32 2018 +0200 ---------------------------------------------------------------------- .../org/apache/drill/common/AutoCloseables.java | 5 + .../common/concurrent/AutoCloseableLock.java | 6 +- .../fn/registry/FunctionRegistryHolder.java | 24 +-- .../fn/registry/RemoteFunctionRegistry.java | 6 +- .../exec/rpc/control/CustomHandlerRegistry.java | 5 +- .../exec/store/sys/BasePersistentStore.java | 22 --- .../drill/exec/store/sys/PersistentStore.java | 61 +----- .../exec/store/sys/PersistentStoreProvider.java | 5 +- .../org/apache/drill/exec/store/sys/Store.java | 58 ++++++ .../store/sys/VersionedPersistentStore.java | 57 ++++++ .../exec/store/sys/store/InMemoryStore.java | 85 ++------- .../store/sys/store/LocalPersistentStore.java | 187 +++++++------------ .../sys/store/VersionedDelegatingStore.java | 120 ++++++++++++ .../sys/store/ZookeeperPersistentStore.java | 3 +- .../ZookeeperPersistentStoreProvider.java | 20 ++ .../exec/testing/store/NoWriteLocalStore.java | 74 ++------ .../drill/exec/work/batch/IncomingBuffers.java | 4 +- .../exec/store/sys/TestPStoreProviders.java | 6 +- .../drill/exec/memory/AllocationManager.java | 11 +- 19 files changed, 401 insertions(+), 358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/common/src/main/java/org/apache/drill/common/AutoCloseables.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java index fcdfe14..c12063c 100644 --- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java +++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java @@ -25,6 +25,11 @@ import java.util.Collection; */ public class AutoCloseables { + public interface Closeable extends AutoCloseable { + @Override + void close(); + } + public static AutoCloseable all(final Collection<? extends AutoCloseable> autoCloseables) { return new AutoCloseable() { @Override http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java index 91d50b4..3fe5c1e 100644 --- a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java +++ b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java @@ -19,10 +19,12 @@ package org.apache.drill.common.concurrent; import java.util.concurrent.locks.Lock; +import org.apache.drill.common.AutoCloseables.Closeable; + /** * Simple wrapper class that allows Locks to be released via an try-with-resources block. */ -public class AutoCloseableLock implements AutoCloseable { +public class AutoCloseableLock implements Closeable { private final Lock lock; @@ -30,7 +32,7 @@ public class AutoCloseableLock implements AutoCloseable { this.lock = lock; } - public AutoCloseableLock open() { + public Closeable open() { lock.lock(); return this; } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java index 3124539..1ab6e19 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java @@ -22,6 +22,8 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; + +import org.apache.drill.common.AutoCloseables.Closeable; import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.exec.expr.fn.DrillFuncHolder; @@ -104,7 +106,7 @@ public class FunctionRegistryHolder { * @return local function registry version number */ public long getVersion() { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { return version; } } @@ -121,7 +123,7 @@ public class FunctionRegistryHolder { * @param newJars jars and list of their function holders, each contains function name, signature and holder */ public void addJars(Map<String, List<FunctionHolder>> newJars, long version) { - try (AutoCloseableLock lock = writeLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) { String jarName = newJar.getKey(); removeAllByJar(jarName); @@ -141,7 +143,7 @@ public class FunctionRegistryHolder { * @param jarName jar name to be removed */ public void removeJar(String jarName) { - try (AutoCloseableLock lock = writeLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { removeAllByJar(jarName); } } @@ -153,7 +155,7 @@ public class FunctionRegistryHolder { * @return list of all jar names */ public List<String> getAllJarNames() { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { return Lists.newArrayList(jars.keySet()); } } @@ -167,7 +169,7 @@ public class FunctionRegistryHolder { * @return list of functions names associated from the jar */ public List<String> getFunctionNamesByJar(String jarName) { - try (AutoCloseableLock lock = readLock.open()){ + try (@SuppressWarnings("unused") Closeable lock = readLock.open()){ Map<String, Queue<String>> functions = jars.get(jarName); return functions == null ? Lists.<String>newArrayList() : Lists.newArrayList(functions.keySet()); } @@ -184,7 +186,7 @@ public class FunctionRegistryHolder { * @return all functions which their holders */ public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicLong version) { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { if (version != null) { version.set(this.version); } @@ -215,7 +217,7 @@ public class FunctionRegistryHolder { * @return all functions which their signatures */ public ListMultimap<String, String> getAllFunctionsWithSignatures() { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create(); for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) { functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet())); @@ -235,7 +237,7 @@ public class FunctionRegistryHolder { * @return list of function holders */ public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicLong version) { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { if (version != null) { version.set(this.version); } @@ -263,7 +265,7 @@ public class FunctionRegistryHolder { * @return true if jar exists, else false */ public boolean containsJar(String jarName) { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { return jars.containsKey(jarName); } } @@ -275,7 +277,7 @@ public class FunctionRegistryHolder { * @return quantity of functions */ public int functionsSize() { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { return functions.size(); } } @@ -291,7 +293,7 @@ public class FunctionRegistryHolder { * @return jar name */ public String getJarNameByFunctionSignature(String functionName, String functionSignature) { - try (AutoCloseableLock lock = readLock.open()) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { for (Map.Entry<String, Map<String, Queue<String>>> jar : jars.entrySet()) { Queue<String> functionSignatures = jar.getValue().get(functionName); if (functionSignatures != null && functionSignatures.contains(functionSignature)) { http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java index 38d8fcc..df5e17f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java @@ -32,9 +32,9 @@ import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.exception.VersionMismatchException; import org.apache.drill.exec.proto.SchemaUserBitShared; import org.apache.drill.exec.proto.UserBitShared.Registry; -import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreProvider; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; import org.apache.drill.exec.store.sys.store.DataChangeVersion; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.conf.Configuration; @@ -92,7 +92,7 @@ public class RemoteFunctionRegistry implements AutoCloseable { private Path stagingArea; private Path tmpArea; - private PersistentStore<Registry> registry; + private VersionedPersistentStore<Registry> registry; private TransientStore<String> unregistration; private TransientStore<String> jars; @@ -192,7 +192,7 @@ public class RemoteFunctionRegistry implements AutoCloseable { .name("udf") .persist() .build(); - registry = storeProvider.getOrCreateStore(registrationConfig); + registry = storeProvider.getOrCreateVersionedStore(registrationConfig); registry.putIfAbsent(registry_path, Registry.getDefaultInstance()); } catch (StoreException e) { throw new DrillRuntimeException("Failure while loading remote registry.", e); http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java index 7a2bd04..97e855c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java @@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.drill.common.AutoCloseables.Closeable; import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.RpcType; @@ -60,7 +61,7 @@ public class CustomHandlerRegistry { Preconditions.checkNotNull(handler); Preconditions.checkNotNull(requestSerde); Preconditions.checkNotNull(responseSerde); - try (AutoCloseableLock lock = write.open()) { + try (@SuppressWarnings("unused") Closeable lock = write.open()) { ParsingHandler<?, ?> parsingHandler = handlers.get(messageTypeId); if (parsingHandler != null) { throw new IllegalStateException(String.format( @@ -76,7 +77,7 @@ public class CustomHandlerRegistry { public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException { final ParsingHandler<?, ?> handler; - try (AutoCloseableLock lock = read.open()) { + try (@SuppressWarnings("unused") Closeable lock = read.open()) { handler = handlers.get(message.getType()); } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java index 0640407..38309bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.store.sys; -import org.apache.drill.exec.store.sys.store.DataChangeVersion; - import java.util.Iterator; import java.util.Map; @@ -29,24 +27,4 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> { return getRange(0, Integer.MAX_VALUE); } - /** By default contains with version will behave the same way as without version. - * Override this method to add version support. */ - public boolean contains(String key, DataChangeVersion version) { - return contains(key); - } - - /** By default get with version will behave the same way as without version. - * Override this method to add version support. */ - @Override - public V get(String key, DataChangeVersion version) { - return get(key); - } - - /** By default put with version will behave the same way as without version. - * Override this method to add version support. */ - @Override - public void put(String key, V value, DataChangeVersion version) { - put(key, value); - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java index 206642a..02959aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.store.sys; -import org.apache.drill.exec.store.sys.store.DataChangeVersion; - import java.util.Iterator; import java.util.Map; @@ -27,11 +25,7 @@ import java.util.Map; * * @param <V> value type */ -public interface PersistentStore<V> extends AutoCloseable { - /** - * Returns storage {@link PersistentStoreMode mode} of this store. - */ - PersistentStoreMode getMode(); +public interface PersistentStore<V> extends Store<V> { /** * Checks if lookup key is present in store. @@ -42,30 +36,12 @@ public interface PersistentStore<V> extends AutoCloseable { boolean contains(String key); /** - * Checks if lookup key is present in store. - * Sets data change version number. - * - * @param key lookup key - * @param version version holder - * @return true if store contains lookup key, false otherwise - */ - boolean contains(String key, DataChangeVersion version); - - /** * Returns the value for the given key if exists, null otherwise. * @param key lookup key */ V get(String key); /** - * Returns the value for the given key if exists, null otherwise. - * Sets data change version number. - * @param key lookup key - * @param version version holder - */ - V get(String key, DataChangeVersion version); - - /** * Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}. * * @param key lookup key @@ -74,41 +50,6 @@ public interface PersistentStore<V> extends AutoCloseable { void put(String key, V value); /** - * Stores the (key, value) tuple in the store. - * If tuple already exits, stores it only if versions match, - * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException} - * Lifetime of the tuple depends upon store {@link #getMode mode}. - * - * @param key lookup key - * @param value value to store - * @param version version holder - */ - void put(String key, V value, DataChangeVersion version); - - /** - * Removes the value corresponding to the given key if exists, nothing happens otherwise. - * @param key lookup key - */ - void delete(String key); - - /** - * Stores the (key, value) tuple in the store only if it does not exists. - * - * @param key lookup key - * @param value value to store - * @return true if put takes place, false otherwise. - */ - boolean putIfAbsent(String key, V value); - - /** - * Returns an iterator of desired number of entries offsetting by the skip value. - * - * @param skip number of records to skip from beginning - * @param take max number of records to return - */ - Iterator<Map.Entry<String, V>> getRange(int skip, int take); - - /** * Returns an iterator of entries. */ Iterator<Map.Entry<String, V>> getAll(); http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java index 75b89b4..c0f7030 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.sys; import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore; /** * A factory used to create {@link PersistentStore store} instances. @@ -33,7 +34,9 @@ public interface PersistentStoreProvider extends AutoCloseable { * @param <V> store value type */ <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException; - + default <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws StoreException { + return new VersionedDelegatingStore<>(getOrCreateStore(config)); + } /** * Sets up the provider. http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java new file mode 100644 index 0000000..c2b1999 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.drill.exec.store.sys; + +import java.util.Iterator; +import java.util.Map; + +/** + * A Store interface used to store and retrieve instances of given value type. + * + * @param <V> value type + */ +public interface Store<V> extends AutoCloseable { + /** + * Returns storage {@link PersistentStoreMode mode} of this store. + */ + PersistentStoreMode getMode(); + + /** + * Removes the value corresponding to the given key if exists, nothing happens otherwise. + * @param key lookup key + */ + void delete(String key); + + /** + * Stores the (key, value) tuple in the store only if it does not exists. + * + * @param key lookup key + * @param value value to store + * @return true if put takes place, false otherwise. + */ + boolean putIfAbsent(String key, V value); + + /** + * Returns an iterator of desired number of entries offsetting by the skip value. + * + * @param skip number of records to skip from beginning + * @param take max number of records to return + */ + Iterator<Map.Entry<String, V>> getRange(int skip, int take); + +} http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java new file mode 100644 index 0000000..24fa78e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.drill.exec.store.sys; + +import org.apache.drill.exec.store.sys.store.DataChangeVersion; + +/** + * Extension to the Store interface that supports versions + * @param <V> + */ +public interface VersionedPersistentStore<V> extends Store<V> { + /** + * Checks if lookup key is present in store. + * Sets data change version number. + * + * @param key lookup key + * @param version version holder + * @return true if store contains lookup key, false otherwise + */ + boolean contains(String key, DataChangeVersion version); + + /** + * Returns the value for the given key if exists, null otherwise. + * Sets data change version number. + * @param key lookup key + * @param version version holder + */ + V get(String key, DataChangeVersion version); + + /** + * Stores the (key, value) tuple in the store. + * If tuple already exits, stores it only if versions match, + * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException} + * Lifetime of the tuple depends upon store {@link #getMode mode}. + * + * @param key lookup key + * @param value value to store + * @param version version holder + */ + void put(String key, V value, DataChangeVersion version); +} http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java index 10da92d..f63c4f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java @@ -19,15 +19,11 @@ package org.apache.drill.exec.store.sys.store; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.drill.common.concurrent.AutoCloseableLock; -import org.apache.drill.exec.exception.VersionMismatchException; import org.apache.drill.exec.store.sys.BasePersistentStore; -import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreMode; import com.google.common.collect.Iterables; @@ -35,26 +31,19 @@ import com.google.common.collect.Iterables; public class InMemoryStore<V> extends BasePersistentStore<V> { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryPersistentStore.class); - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); - private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); - private final ConcurrentSkipListMap<String, V> store; - private int version = -1; + private final ConcurrentNavigableMap<String, V> store; private final int capacity; private final AtomicInteger currentSize = new AtomicInteger(); public InMemoryStore(int capacity) { this.capacity = capacity; //Allows us to trim out the oldest elements to maintain finite max size - this.store = new ConcurrentSkipListMap<String, V>(); + this.store = new ConcurrentSkipListMap<>(); } @Override public void delete(final String key) { - try (AutoCloseableLock lock = writeLock.open()) { - store.remove(key); - version++; - } + store.remove(key); } @Override @@ -64,80 +53,36 @@ public class InMemoryStore<V> extends BasePersistentStore<V> { @Override public boolean contains(final String key) { - return contains(key, null); - } - - @Override - public boolean contains(final String key, final DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = readLock.open()) { - if (dataChangeVersion != null) { - dataChangeVersion.setVersion(version); - } - return store.containsKey(key); - } + return store.containsKey(key); } @Override public V get(final String key) { - return get(key, null); - } - - @Override - public V get(final String key, final DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = readLock.open()) { - if (dataChangeVersion != null) { - dataChangeVersion.setVersion(version); - } - return store.get(key); - } + return store.get(key); } @Override public void put(final String key, final V value) { - put(key, value, null); - } - - @Override - public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = writeLock.open()) { - if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) { - throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); - } - store.put(key, value); - if (currentSize.incrementAndGet() > capacity) { - //Pop Out Oldest - store.pollLastEntry(); - currentSize.decrementAndGet(); - } - - version++; + store.put(key, value); + if (currentSize.incrementAndGet() > capacity) { + //Pop Out Oldest + store.pollLastEntry(); + currentSize.decrementAndGet(); } } @Override public boolean putIfAbsent(final String key, final V value) { - try (AutoCloseableLock lock = writeLock.open()) { - final V old = store.putIfAbsent(key, value); - if (old == null) { - version++; - return true; - } - return false; - } + return (value != store.putIfAbsent(key, value)); } @Override public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) { - try (AutoCloseableLock lock = readLock.open()) { - return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator(); - } + return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator(); } @Override - public void close() throws Exception { - try (AutoCloseableLock lock = writeLock.open()) { - store.clear(); - version = -1; - } + public void close() { + store.clear(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java index 313a9be..0905c0d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -35,9 +35,7 @@ import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.drill.common.collections.ImmutableEntry; -import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.exception.VersionMismatchException; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.store.sys.BasePersistentStore; @@ -59,35 +57,34 @@ import org.slf4j.LoggerFactory; public class LocalPersistentStore<V> extends BasePersistentStore<V> { private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); - private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); - private final Path basePath; private final PersistentStoreConfig<V> config; private final DrillFileSystem fs; - private int version = -1; public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) { - super(); this.basePath = new Path(base, config.getName()); this.config = config; this.fs = fs; - try { - if (!fs.mkdirs(basePath)) { - version++; - } + mkdirs(getBasePath()); } catch (IOException e) { throw new RuntimeException("Failure setting pstore configuration path."); } } + protected Path getBasePath() { + return basePath; + } + @Override public PersistentStoreMode getMode() { return PersistentStoreMode.PERSISTENT; } + private void mkdirs(Path path) throws IOException { + fs.mkdirs(path); + } + public static Path getLogDir() { String drillLogDir = System.getenv("DRILL_LOG_DIR"); if (drillLogDir == null) { @@ -114,39 +111,37 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { @Override public Iterator<Map.Entry<String, V>> getRange(int skip, int take) { - try (AutoCloseableLock lock = readLock.open()) { - try { - // list only files with sys file suffix - PathFilter sysFileSuffixFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); - } - }; - - List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter); - if (fileStatuses.isEmpty()) { - return Collections.emptyIterator(); - } - - List<String> files = Lists.newArrayList(); - for (FileStatus stat : fileStatuses) { - String s = stat.getPath().getName(); - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); + try { + // list only files with sys file suffix + PathFilter sysFileSuffixFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); } + }; - Collections.sort(files); + List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter); + if (fileStatuses.isEmpty()) { + return Collections.emptyIterator(); + } - return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { - @Nullable - @Override - public Entry<String, V> apply(String key) { - return new ImmutableEntry<>(key, get(key)); - } - }).iterator(); - } catch (IOException e) { - throw new RuntimeException(e); + List<String> files = Lists.newArrayList(); + for (FileStatus stat : fileStatuses) { + String s = stat.getPath().getName(); + files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); } + + Collections.sort(files); + + return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { + @Nullable + @Override + public Entry<String, V> apply(String key) { + return new ImmutableEntry<>(key, get(key)); + } + }).iterator(); + } catch (IOException e) { + throw new RuntimeException(e); } } @@ -160,108 +155,68 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { @Override public boolean contains(String key) { - return contains(key, null); - } - - @Override - public boolean contains(String key, DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = readLock.open()) { - try { - Path path = makePath(key); - boolean exists = fs.exists(path); - if (exists && dataChangeVersion != null) { - dataChangeVersion.setVersion(version); - } - return exists; - } catch (IOException e) { - throw new RuntimeException(e); - } + try { + return fs.exists(makePath(key)); + } catch (IOException e) { + throw new RuntimeException(e); } } @Override public V get(String key) { - return get(key, null); - } - - @Override - public V get(String key, DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = readLock.open()) { - try { - if (dataChangeVersion != null) { - dataChangeVersion.setVersion(version); - } - Path path = makePath(key); - if (!fs.exists(path)) { - return null; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - final Path path = makePath(key); - try (InputStream is = fs.open(path)) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); - } catch (IOException e) { - throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); + try { + Path path = makePath(key); + if (!fs.exists(path)) { + return null; } + } catch (IOException e) { + throw new RuntimeException(e); + } + final Path path = makePath(key); + try (InputStream is = fs.open(path)) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + } catch (IOException e) { + throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); } } @Override public void put(String key, V value) { - put(key, value, null); - } - - @Override - public void put(String key, V value, DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = writeLock.open()) { - if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) { - throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); - } - try (OutputStream os = fs.create(makePath(key))) { - IOUtils.write(config.getSerializer().serialize(value), os); - version++; - } catch (IOException e) { - throw new RuntimeException(e); - } + try (OutputStream os = fs.create(makePath(key))) { + IOUtils.write(config.getSerializer().serialize(value), os); + } catch (IOException e) { + throw new RuntimeException(e); } } @Override public boolean putIfAbsent(String key, V value) { - try (AutoCloseableLock lock = writeLock.open()) { - try { - Path p = makePath(key); - if (fs.exists(p)) { - return false; - } else { - try (OutputStream os = fs.create(makePath(key))) { - IOUtils.write(config.getSerializer().serialize(value), os); - version++; - } - return true; + try { + Path p = makePath(key); + if (fs.exists(p)) { + return false; + } else { + try (OutputStream os = fs.create(makePath(key))) { + IOUtils.write(config.getSerializer().serialize(value), os); } - } catch (IOException e) { - throw new RuntimeException(e); + return true; } + } catch (IOException e) { + throw new RuntimeException(e); } } @Override public void delete(String key) { - try (AutoCloseableLock lock = writeLock.open()) { - try { - fs.delete(makePath(key), false); - version++; - } catch (IOException e) { - logger.error("Unable to delete data from storage.", e); - throw new RuntimeException(e); - } + try { + fs.delete(makePath(key), false); + } catch (IOException e) { + logger.error("Unable to delete data from storage.", e); + throw new RuntimeException(e); } } @Override public void close() { } - } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java new file mode 100644 index 0000000..23eedd9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.drill.common.AutoCloseables.Closeable; +import org.apache.drill.common.concurrent.AutoCloseableLock; +import org.apache.drill.exec.exception.VersionMismatchException; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; + +/** + * Versioned Store that delegates operations to PersistentStore + * @param <V> + */ +public class VersionedDelegatingStore<V> implements VersionedPersistentStore<V> { + private final PersistentStore<V> store; + private final ReadWriteLock readWriteLock; + private final AutoCloseableLock readLock; + private final AutoCloseableLock writeLock; + private int version; + + public VersionedDelegatingStore(PersistentStore<V> store) { + this.store = store; + readWriteLock = new ReentrantReadWriteLock(); + readLock = new AutoCloseableLock(readWriteLock.readLock()); + writeLock = new AutoCloseableLock(readWriteLock.writeLock()); + version = -1; + } + + @Override + public PersistentStoreMode getMode() { + return store.getMode(); + } + + @Override + public void delete(final String key) { + try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { + store.delete(key); + version++; + } + } + + @Override + public boolean contains(final String key, final DataChangeVersion dataChangeVersion) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { + boolean contains = store.contains(key); + dataChangeVersion.setVersion(version); + return contains; + } + } + + @Override + public V get(final String key, final DataChangeVersion dataChangeVersion) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { + V value = store.get(key); + dataChangeVersion.setVersion(version); + return value; + } + } + + @Override + public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) { + try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { + if (dataChangeVersion.getVersion() != version) { + throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); + } + store.put(key, value); + version++; + } + } + + @Override + public boolean putIfAbsent(String key, V value) { + try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { + if (store.putIfAbsent(key, value)) { + version++; + return true; + } + return false; + } + } + + @Override + public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) { + try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { + return store.getRange(skip, take); + } + } + + @Override + public void close() throws Exception + { + try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { + store.close(); + version = -1; + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java index a3ee58e..1f20212 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java @@ -36,12 +36,13 @@ import org.apache.drill.exec.serialization.InstanceSerializer; import org.apache.drill.exec.store.sys.BasePersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; import org.apache.zookeeper.CreateMode; /** * Zookeeper based implementation of {@link org.apache.drill.exec.store.sys.PersistentStore}. */ -public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { +public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> implements VersionedPersistentStore<V> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperPersistentStore.class); private final PersistentStoreConfig<V> config; http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java index a5502cb..a2e30f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java @@ -28,7 +28,9 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreRegistry; import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; import org.apache.drill.exec.store.sys.store.LocalPersistentStore; +import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore; import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore; import org.apache.hadoop.fs.Path; @@ -81,6 +83,24 @@ public class ZookeeperPersistentStoreProvider extends BasePersistentStoreProvide } @Override + public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(final PersistentStoreConfig<V> config) throws StoreException { + switch(config.getMode()){ + case BLOB_PERSISTENT: + return new VersionedDelegatingStore<>(new LocalPersistentStore<>(fs, blobRoot, config)); + case PERSISTENT: + final ZookeeperPersistentStore<V> store = new ZookeeperPersistentStore<>(curator, config); + try { + store.start(); + } catch (Exception e) { + throw new StoreException("unable to start zookeeper store", e); + } + return store; + default: + throw new IllegalStateException(); + } + } + + @Override public void close() throws Exception { fs.close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java index e36dc83..528705a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java @@ -20,30 +20,19 @@ package org.apache.drill.exec.testing.store; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import org.apache.drill.common.concurrent.AutoCloseableLock; -import org.apache.drill.exec.exception.VersionMismatchException; + import org.apache.drill.exec.store.sys.BasePersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreMode; -import org.apache.drill.exec.store.sys.store.DataChangeVersion; public class NoWriteLocalStore<V> extends BasePersistentStore<V> { - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); - private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); private final ConcurrentMap<String, V> store = Maps.newConcurrentMap(); - private int version = -1; @Override public void delete(final String key) { - try (AutoCloseableLock lock = writeLock.open()) { - store.remove(key); - version++; - } + store.remove(key); } @Override @@ -53,74 +42,35 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { @Override public boolean contains(final String key) { - return contains(key, null); - } - - @Override - public boolean contains(final String key, final DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = readLock.open()) { - if (dataChangeVersion != null) { - dataChangeVersion.setVersion(version); - } - return store.containsKey(key); - } + return store.containsKey(key); } @Override public V get(final String key) { - return get(key, null); - } - - @Override - public V get(final String key, final DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = readLock.open()) { - if (dataChangeVersion != null) { - dataChangeVersion.setVersion(version); - } - return store.get(key); - } + return store.get(key); } @Override public void put(final String key, final V value) { - put(key, value, null); - } - - @Override - public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) { - try (AutoCloseableLock lock = writeLock.open()) { - if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) { - throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); - } - store.put(key, value); - version++; - } + store.put(key, value); } @Override public boolean putIfAbsent(final String key, final V value) { - try (AutoCloseableLock lock = writeLock.open()) { - final V old = store.putIfAbsent(key, value); - if (old == null) { - version++; - return true; - } - return false; + final V old = store.putIfAbsent(key, value); + if (old == null) { + return true; } + return false; } @Override public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) { - try (AutoCloseableLock lock = readLock.open()) { - return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator(); - } + return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator(); } @Override - public void close() throws Exception { - try (AutoCloseableLock lock = writeLock.open()) { - store.clear(); - version = -1; - } + public void close() { + store.clear(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index a516fad..5182093 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -86,7 +86,7 @@ public class IncomingBuffers implements AutoCloseable { // we want to make sure that we only generate local record batch reference in the case that we're not closed. // Otherwise we would leak memory. - try (AutoCloseableLock lock = sharedIncomingBatchLock.open()) { + try (@SuppressWarnings("unused") AutoCloseables.Closeable lock = sharedIncomingBatchLock.open()) { if (closed) { return false; } @@ -135,7 +135,7 @@ public class IncomingBuffers implements AutoCloseable { @Override public void close() throws Exception { - try (AutoCloseableLock lock = exclusiveCloseLock.open()) { + try (@SuppressWarnings("unused") AutoCloseables.Closeable lock = exclusiveCloseLock.open()) { closed = true; AutoCloseables.close(collectorMap.values()); } http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java index 8896fb0..73ddfe0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.coord.zk.PathUtils; import org.apache.drill.exec.coord.zk.ZookeeperClient; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.server.options.PersistedOptionValue; +import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore; import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; import org.apache.drill.test.BaseDirTestWatcher; @@ -45,6 +46,8 @@ import org.junit.experimental.categories.Category; import java.io.File; +import static org.junit.Assert.assertTrue; + @Category({SlowTest.class}) public class TestPStoreProviders extends TestWithZookeeper { @Rule @@ -133,8 +136,9 @@ public class TestPStoreProviders extends TestWithZookeeper { try (ZookeeperPersistentStoreProvider provider = new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator)) { PersistentStore<PersistedOptionValue> store = provider.getOrCreateStore(storeConfig); + assertTrue(store instanceof ZookeeperPersistentStore); - PersistedOptionValue oldOptionValue = store.get(oldName, null); + PersistedOptionValue oldOptionValue = ((ZookeeperPersistentStore<PersistedOptionValue>)store).get(oldName, null); PersistedOptionValue expectedValue = new PersistedOptionValue("true"); Assert.assertEquals(expectedValue, oldOptionValue); http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java index 3b5967f..e9f35cc 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.drill.common.AutoCloseables.Closeable; import org.apache.drill.common.HistoricalLog; import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.exec.memory.BaseAllocator.Verbosity; @@ -109,7 +110,7 @@ public class AllocationManager { "A buffer can only be associated between two allocators that share the same root."); } - try (AutoCloseableLock read = readLock.open()) { + try (@SuppressWarnings("unused") Closeable read = readLock.open()) { final BufferLedger ledger = map.get(allocator); if (ledger != null) { @@ -119,7 +120,7 @@ public class AllocationManager { return ledger; } } - try (AutoCloseableLock write = writeLock.open()) { + try (@SuppressWarnings("unused") Closeable write = writeLock.open()) { // we have to recheck existing ledger since a second reader => writer could be competing with us. final BufferLedger existingLedger = map.get(allocator); @@ -242,7 +243,7 @@ public class AllocationManager { // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure // that this won't happen by synchronizing on the allocator manager instance. - try (AutoCloseableLock write = writeLock.open()) { + try (@SuppressWarnings("unused") Closeable write = writeLock.open()) { if (owningLedger != this) { return true; } @@ -320,7 +321,7 @@ public class AllocationManager { allocator.assertOpen(); final int outcome; - try (AutoCloseableLock write = writeLock.open()) { + try (@SuppressWarnings("unused") Closeable write = writeLock.open()) { outcome = bufRefCnt.addAndGet(-decrement); if (outcome == 0) { lDestructionTime = System.nanoTime(); @@ -424,7 +425,7 @@ public class AllocationManager { * @return Amount of accounted(owned) memory associated with this ledger. */ public int getAccountedSize() { - try (AutoCloseableLock read = readLock.open()) { + try (@SuppressWarnings("unused") Closeable read = readLock.open()) { if (owningLedger == this) { return size; } else {
