Repository: drill Updated Branches: refs/heads/master 9a3a5c4ff -> 8126927fd
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java deleted file mode 100644 index 0d2fb38..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java +++ /dev/null @@ -1,291 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.zk; - -import java.io.IOException; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; - -/** - * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store) - * @param <V> - */ -public abstract class ZkAbstractStore<V> implements AutoCloseable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class); - - protected final CuratorFramework framework; - protected final PStoreConfig<V> config; - private final PathChildrenCache childrenCache; - private final String prefix; - private final String parent; - - public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config) - throws IOException { - this.parent = "/" + config.getName(); - this.prefix = parent + "/"; - this.framework = framework; - this.config = config; - this.childrenCache = new PathChildrenCache(framework, parent, true); - - // make sure the parent node exists. - createOrUpdate(parent, null, CreateMode.PERSISTENT); - try { - childrenCache.start(StartMode.BUILD_INITIAL_CACHE); - } catch (Exception e) { - throw new RuntimeException("Failure while initializing Zookeeper for PStore", e); - } - } - - public Iterator<Entry<String, V>> iterator() { - try { - return new Iter(childrenCache.getCurrentData()); - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper.", e); - } - } - - protected String withPrefix(String key) { - Preconditions.checkArgument(!key.contains("/"), - "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface."); - return prefix + key; - } - - public V get(String key) { - try { - ChildData d = childrenCache.getCurrentData(withPrefix(key)); - if(d == null || d.getData() == null){ - return null; - } - byte[] bytes = d.getData(); - return config.getSerializer().deserialize(bytes); - - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); - } - } - - public void put(String key, V value) { - try { - if (childrenCache.getCurrentData(withPrefix(key)) != null) { - framework.setData().forPath(withPrefix(key), config.getSerializer().serialize(value)); - } else { - createWithPrefix(key, value); - } - childrenCache.rebuildNode(withPrefix(key)); - - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); - } - } - - public void delete(String key) { - try { - if (framework.checkExists().forPath(withPrefix(key)) != null) { - framework.delete().forPath(withPrefix(key)); - childrenCache.rebuildNode(withPrefix(key)); - } - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); - } - } - - public boolean putIfAbsent(String key, V value) { - try { - if (childrenCache.getCurrentData(withPrefix(key)) != null) { - return false; - } else { - createWithPrefix(key, value); - childrenCache.rebuildNode(withPrefix(key)); - return true; - } - - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - } - - /** - * Default {@link CreateMode create mode} that will be used in create operations referred in the see also section. - * - * @see #createOrUpdate(String, Object) - * @see #createWithPrefix(String, Object) - */ - protected abstract CreateMode getCreateMode(); - - - /** - * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and sets its value if supplied. - * - * @param path target path - * @param value value to set, null if none available - * - * @see #getCreateMode() - * @see #createOrUpdate(String, Object) - * @see #withPrefix(String) - */ - protected void createWithPrefix(String path, V value) { - createOrUpdate(withPrefix(path), value); - } - - /** - * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and sets its value if supplied - * or updates its value if the node already exists. - * - * Note that if node exists, its mode will not be changed. - * - * @param path target path - * @param value value to set, null if none available - * - * @see #getCreateMode() - * @see #createOrUpdate(String, Object, CreateMode) - */ - protected void createOrUpdate(String path, V value) { - createOrUpdate(path, value, getCreateMode()); - } - - /** - * Creates a node in zookeeper with the given mode and sets its value if supplied or updates its value if the node - * already exists. - * - * Note that if the node exists, its mode will not be changed. - * - * Internally, the method suppresses {@link org.apache.zookeeper.KeeperException.NodeExistsException}. It is - * safe to do so since the implementation is idempotent. - * - * @param path target path - * @param value value to set, null if none available - * @param mode creation mode - * @throws RuntimeException throws a {@link RuntimeException} wrapping the root cause. - */ - protected void createOrUpdate(String path, V value, CreateMode mode) { - try { - final boolean isUpdate = value != null; - final byte[] valueInBytes = isUpdate ? config.getSerializer().serialize(value) : null; - final boolean nodeExists = framework.checkExists().forPath(path) != null; - if (!nodeExists) { - final ACLBackgroundPathAndBytesable<String> creator = framework.create().withMode(mode); - if (isUpdate) { - creator.forPath(path, valueInBytes); - } else { - creator.forPath(path); - } - } else if (isUpdate) { - framework.setData().forPath(path, valueInBytes); - } - } catch (KeeperException.NodeExistsException ex) { - logger.warn("Node already exists in Zookeeper. Skipping... -- [path: {}, mode: {}]", path, mode); - } catch (Exception e) { - final String msg = String.format("Failed to create/update Zookeeper node. [path: %s, mode: %s]", path, mode); - throw new RuntimeException(msg, e); - } - } - - private class Iter implements Iterator<Entry<String, V>>{ - - private Iterator<ChildData> keys; - private ChildData current; - - public Iter(List<ChildData> children) { - super(); - List<ChildData> sortedChildren = Lists.newArrayList(children); - Collections.sort(sortedChildren, new Comparator<ChildData>(){ - @Override - public int compare(ChildData o1, ChildData o2) { - return o1.getPath().compareTo(o2.getPath()); - }}); - this.keys = sortedChildren.iterator(); - } - - @Override - public boolean hasNext() { - return keys.hasNext(); - } - - @Override - public Entry<String, V> next() { - current = keys.next(); - return new DeferredEntry(current); - } - - @Override - public void remove() { - delete(keyFromPath(current)); - } - - private String keyFromPath(ChildData data){ - String path = data.getPath(); - return path.substring(prefix.length(), path.length()); - } - - private class DeferredEntry implements Entry<String, V>{ - - private ChildData data; - - public DeferredEntry(ChildData data) { - super(); - this.data = data; - } - - @Override - public String getKey() { - return keyFromPath(data); - } - - @Override - public V getValue() { - try { - return config.getSerializer().deserialize(data.getData()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); - } - - } - - } - - @Override - public void close() { - try{ - childrenCache.close(); - }catch(IOException e){ - logger.warn("Failure while closing out abstract store.", e); - } - } - - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java deleted file mode 100644 index 4706287..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.zk; - -import java.io.IOException; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.exec.store.sys.EStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.zookeeper.CreateMode; - -/** - * Implementation of EStore using Zookeeper's EPHEMERAL node. - * @param <V> - */ -public class ZkEStore<V> extends ZkAbstractStore<V> implements EStore<V> { - - public ZkEStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { - super(framework,config); - } - - @Override - protected CreateMode getCreateMode() { - return CreateMode.EPHEMERAL; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java deleted file mode 100644 index 60277aa..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.store.sys.zk; - -import java.io.IOException; - -import com.google.common.base.Preconditions; -import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.exec.store.sys.EStore; -import org.apache.drill.exec.store.sys.EStoreProvider; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreConfig.Mode; - -public class ZkEStoreProvider implements EStoreProvider{ - private final CuratorFramework curator; - - public ZkEStoreProvider(CuratorFramework curator) { - this.curator = curator; - } - - @Override - public <V> EStore<V> getStore(PStoreConfig<V> store) throws IOException { - Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL); - return new ZkEStore<V>(curator,store); - } - - @Override - public void start() throws IOException { - } - - @Override - public void close() throws Exception { - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java deleted file mode 100644 index da22996..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.zk; - -import java.io.IOException; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.zookeeper.CreateMode; - -/** - * Implementation of PStore using Zookeeper's PERSISTENT node. - * @param <V> - */ -public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V> { - - public ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { - super(framework, config); - } - - @Override - protected CreateMode getCreateMode() { - return CreateMode.PERSISTENT; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java index eb5df43..8cc2fde 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p/> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,83 +17,19 @@ */ package org.apache.drill.exec.store.sys.zk; -import java.io.IOException; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; -import org.apache.drill.exec.exception.DrillbitStartupException; -import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.sys.EStoreProvider; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreProvider; -import org.apache.drill.exec.store.sys.PStoreRegistry; -import org.apache.drill.exec.store.sys.local.FilePStore; -import org.apache.hadoop.fs.Path; - -public class ZkPStoreProvider implements PStoreProvider { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class); - - private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = "drill.exec.sys.store.provider.zk.blobroot"; - - private final CuratorFramework curator; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; - private final DrillFileSystem fs; - private final Path blobRoot; - private final EStoreProvider zkEStoreProvider; - - public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException { - ClusterCoordinator coord = registry.getClusterCoordinator(); - if (!(coord instanceof ZKClusterCoordinator)) { - throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator."); - } - this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator(); - - if (registry.getConfig().hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) { - blobRoot = new Path(registry.getConfig().getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)); - }else{ - blobRoot = FilePStore.getLogDir(); - } - - try{ - this.fs = FilePStore.getFileSystem(registry.getConfig(), blobRoot); - }catch(IOException e){ - throw new DrillbitStartupException("Failure while attempting to set up blob store.", e); - } - - this.zkEStoreProvider = new ZkEStoreProvider(curator); - } - - @VisibleForTesting - public ZkPStoreProvider(DrillConfig config, CuratorFramework curator) throws IOException { - this.curator = curator; - this.blobRoot = FilePStore.getLogDir(); - this.fs = FilePStore.getFileSystem(config, blobRoot); - this.zkEStoreProvider = new ZkEStoreProvider(curator); - } - - @Override - public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException { - switch(config.getMode()){ - case BLOB_PERSISTENT: - return new FilePStore<V>(fs, blobRoot, config); - case EPHEMERAL: - return zkEStoreProvider.getStore(config); - case PERSISTENT: - return new ZkPStore<V>(curator, config); - default: - throw new IllegalStateException(); - } - } - - @Override - public void start() { - } - - @Override - public void close() { +/** + * Kept for possible references to old class name in configuration. + * + * @deprecated will be removed in 1.7 + * use {@link ZookeeperPersistentStoreProvider} instead. + */ +public class ZkPStoreProvider extends ZookeeperPersistentStoreProvider { + public ZkPStoreProvider(PersistentStoreRegistry<ZKClusterCoordinator> registry) throws StoreException { + super(registry); } } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java new file mode 100644 index 0000000..ff14f6d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.testing.store; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import org.apache.drill.exec.store.sys.BasePersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreMode; + +public class NoWriteLocalStore<V> extends BasePersistentStore<V> { + private final ConcurrentMap<String, V> store = Maps.newConcurrentMap(); + + public void delete(final String key) { + store.remove(key); + } + + @Override + public PersistentStoreMode getMode() { + return PersistentStoreMode.PERSISTENT; + } + + @Override + public V get(final String key) { + return store.get(key); + } + + @Override + public void put(final String key, final V value) { + store.put(key, value); + } + + @Override + public boolean putIfAbsent(final String key, final V value) { + final V old = store.putIfAbsent(key, value); + return value != old; + } + + @Override + public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) { + return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator(); + } + + @Override + public void close() throws Exception { + store.clear(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 5fd6f1c..f2305c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -39,7 +39,7 @@ import org.apache.drill.exec.rpc.control.WorkEventBus; import org.apache.drill.exec.rpc.data.DataConnectionCreator; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.batch.ControlMessageHandler; import org.apache.drill.exec.work.foreman.Foreman; import org.apache.drill.exec.work.foreman.QueryManager; @@ -101,7 +101,7 @@ public class WorkManager implements AutoCloseable { final Controller controller, final DataConnectionCreator data, final ClusterCoordinator coord, - final PStoreProvider provider) { + final PersistentStoreProvider provider) { dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider); statusThread.start(); @@ -153,6 +153,8 @@ public class WorkManager implements AutoCloseable { } } } + + getContext().close(); } public DrillbitContext getContext() { http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index bfc9dff..8527850 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -163,8 +163,8 @@ public class Foreman implements Runnable { closeFuture.addListener(closeListener); queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId); - queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(), - stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this + queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), + drillbitContext.getClusterCoordinator(), stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this final OptionManager optionManager = queryContext.getOptions(); queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE); @@ -768,6 +768,12 @@ public class Foreman implements Runnable { bee.retireForeman(Foreman.this); try { + queryManager.close(); + } catch (final Exception e) { + logger.warn("unable to close query manager", e); + } + + try { releaseLease(); } finally { isClosed = true; http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 060cffc..39fa5cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -19,15 +19,18 @@ package org.apache.drill.exec.work.foreman; import io.netty.buffer.ByteBuf; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.store.TransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -45,9 +48,9 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.foreman.Foreman.StateListener; @@ -59,20 +62,18 @@ import com.google.common.collect.Maps; /** * Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments. */ -public class QueryManager { +public class QueryManager implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class); - public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig. + public static final PersistentStoreConfig<QueryProfile> QUERY_PROFILE = PersistentStoreConfig. newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE) .name("profiles") .blob() - .max(100) .build(); - public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig. - newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE) + public static final TransientStoreConfig<QueryInfo> RUNNING_QUERY_INFO = TransientStoreConfig + .newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE) .name("running") - .ephemeral() .build(); private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap(); @@ -90,8 +91,8 @@ public class QueryManager { new IntObjectHashMap<>(); private final List<FragmentData> fragmentDataSet = Lists.newArrayList(); - private final PStore<QueryProfile> profilePStore; - private final PStore<QueryInfo> profileEStore; + private final PersistentStore<QueryProfile> profileStore; + private final TransientStore<QueryInfo> transientProfiles; // the following mutable variables are used to capture ongoing query status private String planText; @@ -104,8 +105,8 @@ public class QueryManager { // How many fragments have finished their execution. private final AtomicInteger finishedFragments = new AtomicInteger(0); - public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider, - final StateListener stateListener, final Foreman foreman) { + public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider, + final ClusterCoordinator coordinator, final StateListener stateListener, final Foreman foreman) { this.queryId = queryId; this.runQuery = runQuery; this.stateListener = stateListener; @@ -113,11 +114,11 @@ public class QueryManager { stringQueryId = QueryIdHelper.getQueryId(queryId); try { - profilePStore = pStoreProvider.getStore(QUERY_PROFILE); - profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO); - } catch (final IOException e) { + profileStore = storeProvider.getOrCreateStore(QUERY_PROFILE); + } catch (final Exception e) { throw new DrillRuntimeException(e); } + transientProfiles = coordinator.getOrCreateTransientStore(RUNNING_QUERY_INFO); } private static boolean isTerminal(final FragmentState state) { @@ -237,11 +238,14 @@ public class QueryManager { } } + @Override + public void close() throws Exception { } + /* - * This assumes that the FragmentStatusListener implementation takes action when it hears - * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything - * but log messages. - */ + * This assumes that the FragmentStatusListener implementation takes action when it hears + * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything + * but log messages. + */ private static class SignalListener extends EndpointListener<Ack, FragmentHandle> { /** * An enum of possible signals that {@link SignalListener} listens to. @@ -281,14 +285,14 @@ public class QueryManager { case STARTING: case RUNNING: case CANCELLATION_REQUESTED: - profileEStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. + transientProfiles.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. break; case COMPLETED: case CANCELED: case FAILED: try { - profileEStore.delete(stringQueryId); + transientProfiles.remove(stringQueryId); } catch(final Exception e) { logger.warn("Failure while trying to delete the estore profile for this query.", e); } @@ -305,7 +309,7 @@ public class QueryManager { void writeFinalProfile(UserException ex) { try { // TODO(DRILL-2362) when do these ever get deleted? - profilePStore.put(stringQueryId, getQueryProfile(ex)); + profileStore.put(stringQueryId, getQueryProfile(ex)); } catch (Exception e) { logger.error("Failure while storing Query Profile", e); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index d6ba99a..b1b9b46 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -120,7 +120,7 @@ drill.exec: { affinity.factor: 1.2 }, sys.store.provider: { - class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider", + class: "org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider", local: { path: "/tmp/drill", write: true http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 26b7464..7ab73dc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -248,7 +248,7 @@ public class BaseTestQuery extends ExecTest { } @AfterClass - public static void closeClient() throws IOException { + public static void closeClient() throws Exception { if (client != null) { client.close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index 24c8c63..9f39d15 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -48,7 +48,7 @@ import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.StoragePluginRegistryImpl; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; import org.apache.drill.exec.testing.ExecutionControls; import org.junit.Rule; import org.junit.rules.TestRule; @@ -76,7 +76,7 @@ public class PlanningBase extends ExecTest{ protected void testSqlPlan(String sqlCommands) throws Exception { final String[] sqlStrings = sqlCommands.split(";"); - final LocalPStoreProvider provider = new LocalPStoreProvider(config); + final LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(config); provider.start(); final ScanResult scanResult = ClassPathScanner.fromPrescan(config); final LogicalPlanPersistence logicalPlanPersistence = new LogicalPlanPersistence(config, scanResult); @@ -97,7 +97,7 @@ public class PlanningBase extends ExecTest{ result = config; dbContext.getOptionManager(); result = systemOptions; - dbContext.getPersistentStoreProvider(); + dbContext.getStoreProvider(); result = provider; dbContext.getClasspathScan(); result = scanResult; http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java index 9032946..f4903d8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java @@ -19,19 +19,17 @@ package org.apache.drill.exec.compile; import static com.google.common.base.Preconditions.checkNotNull; -import java.io.IOException; - import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.LogicalPlanPersistence; import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.exec.server.options.SystemOptionManager; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; public class CodeCompilerTestFactory { - public static CodeCompiler getTestCompiler(DrillConfig c) throws IOException { + public static CodeCompiler getTestCompiler(DrillConfig c) throws Exception { DrillConfig config = checkNotNull(c); LogicalPlanPersistence persistence = new LogicalPlanPersistence(config, ClassPathScanner.fromPrescan(config)); - LocalPStoreProvider provider = new LocalPStoreProvider(config); + LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(config); SystemOptionManager systemOptionManager = new SystemOptionManager(persistence, provider); return new CodeCompiler(config, systemOptionManager.init()); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java index 8486801..9b507fe 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java @@ -25,10 +25,9 @@ import java.net.URL; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.compile.DrillCheckClassAdapter; import org.apache.drill.exec.compile.QueryClassLoader; -import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; import org.apache.drill.exec.server.options.SystemOptionManager; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; import org.objectweb.asm.ClassReader; import org.objectweb.asm.ClassVisitor; import org.objectweb.asm.ClassWriter; @@ -57,7 +56,7 @@ public class ReplaceMethodInvoke { check(output); final DrillConfig c = DrillConfig.forClient(); - final SystemOptionManager m = new SystemOptionManager(PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c), new LocalPStoreProvider(c)); + final SystemOptionManager m = new SystemOptionManager(PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c), new LocalPersistentStoreProvider(c)); m.init(); try (QueryClassLoader ql = new QueryClassLoader(DrillConfig.create(), m)) { ql.injectByteCode("org.apache.drill.Pickle$OutgoingBatch", output); http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java new file mode 100644 index 0000000..021a0b7 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import java.io.IOException; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.serialization.InstanceSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestEphemeralStore { + private final static String root = "/test"; + private final static String path = "test-key"; + private final static String value = "testing"; + + private TestingServer server; + private CuratorFramework curator; + private TransientStoreConfig<String> config; + private ZkEphemeralStore<String> store; + + static class StoreWithMockClient<V> extends ZkEphemeralStore<V> { + private final ZookeeperClient client = Mockito.mock(ZookeeperClient.class); + + public StoreWithMockClient(final TransientStoreConfig<V> config, final CuratorFramework curator) { + super(config, curator); + } + + @Override + protected ZookeeperClient getClient() { + return client; + } + } + + + @Before + public void setUp() throws Exception { + server = new TestingServer(); + final RetryPolicy policy = new RetryNTimes(2, 1000); + curator = CuratorFrameworkFactory.newClient(server.getConnectString(), policy); + + config = Mockito.mock(TransientStoreConfig.class); + Mockito + .when(config.getName()) + .thenReturn(root); + + Mockito + .when(config.getSerializer()) + .thenReturn(new InstanceSerializer<String>() { + @Override + public byte[] serialize(final String instance) throws IOException { + if (instance == null) { + return null; + } + return instance.getBytes(); + } + + @Override + public String deserialize(final byte[] raw) throws IOException { + if (raw == null) { + return null; + } + return new String(raw); + } + }); + + store = new ZkEphemeralStore<>(config, curator); + + server.start(); + curator.start(); + store.start(); + } + + /** + * This test ensures store subscribes to receive events from underlying client. Dispatcher tests ensures listeners + * are fired on incoming events. These two sets of tests ensure observer pattern in {@code TransientStore} works fine. + */ + @Test + public void testStoreRegistersDispatcherAndStartsItsClient() throws Exception { + final StoreWithMockClient<String> store = new StoreWithMockClient<>(config, curator); + + final PathChildrenCache cache = Mockito.mock(PathChildrenCache.class); + final ZookeeperClient client = store.getClient(); + Mockito + .when(client.getCache()) + .thenReturn(cache); + + final ListenerContainer<PathChildrenCacheListener> container = Mockito.mock(ListenerContainer.class); + Mockito + .when(cache.getListenable()) + .thenReturn(container); + + store.start(); + + Mockito + .verify(container) + .addListener(store.dispatcher); + + Mockito + .verify(client) + .start(); + } + + @After + public void tearDown() throws Exception { + store.close(); + curator.close(); + server.close(); + } + + @Test + public void testPutAndGetWorksAntagonistacally() { + store.put(path, value); + final String actual = store.get(path); + Assert.assertEquals("value mismatch", value, actual); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java new file mode 100644 index 0000000..f83ec00 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import java.io.IOException; + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreEvent; +import org.apache.drill.exec.serialization.InstanceSerializer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestEventDispatcher { + + private final static String key = "some-key"; + private final static String value = "some-data"; + private final static byte[] data = "some-data".getBytes(); + + private ZkEphemeralStore<String> store; + private EventDispatcher<String> dispatcher; + private ChildData child; + + @Before + public void setUp() { + store = Mockito.mock(ZkEphemeralStore.class); + final TransientStoreConfig<String> config = Mockito.mock(TransientStoreConfig.class); + Mockito + .when(store.getConfig()) + .thenReturn(config); + + Mockito + .when(config.getSerializer()) + .thenReturn(new InstanceSerializer<String>() { + @Override + public byte[] serialize(String instance) throws IOException { + return instance.getBytes(); + } + + @Override + public String deserialize(byte[] raw) throws IOException { + return new String(raw); + } + }); + + dispatcher = new EventDispatcher<>(store); + child = Mockito.mock(ChildData.class); + Mockito + .when(child.getPath()) + .thenReturn(key); + + Mockito + .when(child.getData()) + .thenReturn(data); + } + + @Test + public void testDispatcherPropagatesEvents() throws Exception { + final PathChildrenCacheEvent.Type[] types = new PathChildrenCacheEvent.Type[] { + PathChildrenCacheEvent.Type.CHILD_ADDED, + PathChildrenCacheEvent.Type.CHILD_REMOVED, + PathChildrenCacheEvent.Type.CHILD_UPDATED + }; + + for (final PathChildrenCacheEvent.Type type:types) { + dispatcher.childEvent(null, new PathChildrenCacheEvent(type, child)); + + final TransientStoreEvent event = TransientStoreEvent.of(EventDispatcher.MAPPINGS.get(type), key, value); + Mockito + .verify(store) + .fireListeners(event); + } + + Assert.assertEquals("Number of event types that dispatcher can handle is different", types.length, EventDispatcher.MAPPINGS.size()); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java new file mode 100644 index 0000000..d966744 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import org.junit.Assert; +import org.junit.Test; + +public class TestPathUtils { + + @Test(expected = NullPointerException.class) + public void testNullSegmentThrowsNPE() { + PathUtils.join("", null, ""); + } + + @Test + public void testJoinPreservesAbsoluteOrRelativePaths() { + final String actual = PathUtils.join("/a", "/b", "/c"); + final String expected = "/a/b/c"; + Assert.assertEquals("invalid path", expected, actual); + + final String actual2 = PathUtils.join("/a", "b", "c"); + final String expected2 = "/a/b/c"; + Assert.assertEquals("invalid path", expected2, actual2); + + final String actual3 = PathUtils.join("a", "b", "c"); + final String expected3 = "a/b/c"; + Assert.assertEquals("invalid path", expected3, actual3); + + final String actual4 = PathUtils.join("a", "", "c"); + final String expected4 = "a/c"; + Assert.assertEquals("invalid path", expected4, actual4); + + final String actual5 = PathUtils.join("", "", "c"); + final String expected5 = "c"; + Assert.assertEquals("invalid path", expected5, actual5); + + final String actual6 = PathUtils.join("", "", ""); + final String expected6 = ""; + Assert.assertEquals("invalid path", expected6, actual6); + + final String actual7 = PathUtils.join("", "", "/"); + final String expected7 = "/"; + Assert.assertEquals("invalid path", expected7, actual7); + + final String actual8 = PathUtils.join("", "", "c/"); + final String expected8 = "c/"; + Assert.assertEquals("invalid path", expected8, actual8); + } + + + @Test + public void testNormalizeRemovesRedundantForwardSlashes() { + final String actual = PathUtils.normalize("/a/b/c"); + final String expected = "/a/b/c"; + Assert.assertEquals("invalid path", expected, actual); + + final String actual2 = PathUtils.normalize("//a//b//c"); + final String expected2 = "/a/b/c"; + Assert.assertEquals("invalid path", expected2, actual2); + + final String actual3 = PathUtils.normalize("///"); + final String expected3 = "/"; + Assert.assertEquals("invalid path", expected3, actual3); + + final String actual4 = PathUtils.normalize("/a"); + final String expected4 = "/a"; + Assert.assertEquals("invalid path", expected4, actual4); + + final String actual5 = PathUtils.normalize("//////"); + final String expected5 = "/"; + Assert.assertEquals("invalid path", expected5, actual5); + + final String actual6 = PathUtils.normalize(""); + final String expected6 = ""; + Assert.assertEquals("invalid path", expected6, actual6); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java new file mode 100644 index 0000000..3007566 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.DeleteBuilder; +import org.apache.curator.framework.api.SetDataBuilder; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.EnsurePath; +import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.zookeeper.CreateMode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestZookeeperClient { + private final static String root = "/test"; + private final static String path = "test-key"; + private final static String abspath = PathUtils.join(root, path); + private final static byte[] data = "testing".getBytes(); + private final static CreateMode mode = CreateMode.PERSISTENT; + + private TestingServer server; + private CuratorFramework curator; + private ZookeeperClient client; + + static class ClientWithMockCache extends ZookeeperClient { + private final PathChildrenCache cacheMock = Mockito.mock(PathChildrenCache.class); + + public ClientWithMockCache(final CuratorFramework curator, final String root, final CreateMode mode) { + super(curator, root, mode); + } + + @Override + public PathChildrenCache getCache() { + return cacheMock; + } + } + + @Before + public void setUp() throws Exception { + server = new TestingServer(); + final RetryPolicy policy = new RetryNTimes(1, 1000); + curator = CuratorFrameworkFactory.newClient(server.getConnectString(), policy); + client = new ClientWithMockCache(curator, root, mode); + + server.start(); + curator.start(); + client.start(); + } + + @After + public void tearDown() throws Exception { + client.close(); + curator.close(); + server.close(); + } + + @Test + public void testStartingClientEnablesCacheAndEnsuresRootNodeExists() throws Exception { + Assert.assertTrue("start must create the root node", client.hasPath("", true)); + + Mockito + .verify(client.getCache()) + .start(); + } + + @Test + public void testHasPathWithEventualConsistencyHitsCache() { + final String path = "test-key"; + final String absPath = PathUtils.join(root, path); + + Mockito + .when(client.getCache().getCurrentData(absPath)) + .thenReturn(null); + + Assert.assertFalse(client.hasPath(path)); // test convenience method + + Mockito + .when(client.getCache().getCurrentData(absPath)) + .thenReturn(new ChildData(absPath, null, null)); + + Assert.assertTrue(client.hasPath(path, false)); // test actual method + } + + @Test(expected = DrillRuntimeException.class) + public void testHasPathThrowsDrillRuntimeException() { + final String path = "test-key"; + final String absPath = PathUtils.join(root, path); + + Mockito + .when(client.getCache().getCurrentData(absPath)) + .thenThrow(Exception.class); + + client.hasPath(path); + } + + @Test + public void testPutAndGetWorks() { + client.put(path, data); + final byte[] actual = client.get(path, true); + Assert.assertArrayEquals("data mismatch", data, actual); + } + + @Test + public void testGetWithEventualConsistencyHitsCache() { + Mockito + .when(client.getCache().getCurrentData(abspath)) + .thenReturn(null); + + Assert.assertEquals("get should return null", null, client.get(path)); + + Mockito + .when(client.getCache().getCurrentData(abspath)) + .thenReturn(new ChildData(abspath, null, data)); + + Assert.assertEquals("get should return data", data, client.get(path, false)); + } + + @Test + public void testCreate() throws Exception { + client.create(path); + Assert.assertTrue("path must exist", client.hasPath(path, true)); + + // ensure invoking create also rebuilds cache + Mockito + .verify(client.getCache(), Mockito.times(1)) + .rebuildNode(abspath); + } + + @Test + public void testDelete() throws Exception { + client.create(path); + Assert.assertTrue("path must exist", client.hasPath(path, true)); + client.delete(path); + Assert.assertFalse("path must not exist", client.hasPath(path, true)); + + // ensure cache is rebuilt + Mockito + .verify(client.getCache(), Mockito.times(2)) + .rebuildNode(abspath); + } + + + @Test + public void testEntriesReturnsRelativePaths() throws Exception { + final ChildData child = Mockito.mock(ChildData.class); + Mockito + .when(child.getPath()) + .thenReturn(abspath); + + Mockito + .when(child.getData()) + .thenReturn(data); + + final List<ChildData> children = Lists.newArrayList(child); + Mockito + .when(client.getCache().getCurrentData()) + .thenReturn(children); + + final Iterator<Map.Entry<String, byte[]>> entries = client.entries(); + + // returned entry must contain the given relative path + final Map.Entry<String, byte[]> expected = new ImmutableEntry<>(path, data); + + Assert.assertEquals("entries do not match", expected, entries.next()); + } + + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java index 29aaf3a..bf56eb6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java @@ -55,7 +55,7 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.StoragePluginRegistryImpl; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.junit.Ignore; @@ -112,7 +112,7 @@ public class TestOptiqPlans extends ExecTest { controller, com, workBus, - new LocalPStoreProvider(config)); + new LocalPersistentStoreProvider(config)); final QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), bitContext, QueryId.getDefaultInstance()); final PhysicalPlanReader reader = bitContext.getPlanReader(); http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java index bd3145f..bed71f9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -26,7 +26,6 @@ import java.util.List; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.LogicalPlanPersistence; import org.apache.drill.common.scanner.ClassPathScanner; -import org.apache.drill.common.scanner.persistence.ScanResult; import org.apache.drill.common.util.FileUtils; import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.client.DrillClient; @@ -51,7 +50,7 @@ import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.server.options.SystemOptionManager; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; import org.apache.drill.exec.vector.ValueVector; import org.junit.Rule; import org.junit.Test; @@ -73,7 +72,7 @@ public class TestHashJoin extends PopUnitTestBase { private final DrillConfig c = DrillConfig.create(); private void testHJMockScanCommon(final DrillbitContext bitContext, UserServer.UserClientConnection connection, String physicalPlan, int expectedRows) throws Throwable { - final LocalPStoreProvider provider = new LocalPStoreProvider(c); + final LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(c); provider.start(); final SystemOptionManager opt = new SystemOptionManager(PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c), provider); opt.init(); http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java index 1f5aa22..bcd2f5e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java @@ -31,8 +31,8 @@ import com.google.common.collect.Maps; public class PStoreTestUtil { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class); - public static void test(PStoreProvider provider) throws Exception{ - PStore<String> store = provider.getStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build()); + public static void test(PersistentStoreProvider provider) throws Exception{ + PersistentStore<String> store = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build()); String[] keys = {"first", "second"}; String[] values = {"value1", "value2"}; Map<String, String> expectedMap = Maps.newHashMap(); @@ -43,7 +43,7 @@ public class PStoreTestUtil { } // allow one second for puts to propagate back to cache { - Iterator<Map.Entry<String, String>> iter = store.iterator(); + Iterator<Map.Entry<String, String>> iter = store.getAll(); for(int i =0; i < keys.length; i++){ Entry<String, String> e = iter.next(); assertTrue(expectedMap.containsKey(e.getKey())); @@ -54,15 +54,15 @@ public class PStoreTestUtil { } { - Iterator<Map.Entry<String, String>> iter = store.iterator(); + Iterator<Map.Entry<String, String>> iter = store.getAll(); while(iter.hasNext()){ - iter.next(); - iter.remove(); + final String key = iter.next().getKey(); + store.delete(key); } } // allow one second for deletes to propagate back to cache - assertFalse(store.iterator().hasNext()); + assertFalse(store.getAll().hasNext()); } } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java index cbfbcd3..93e2497 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java @@ -23,18 +23,18 @@ import org.apache.curator.retry.RetryNTimes; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.TestWithZookeeper; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; -import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; import org.junit.Test; public class TestPStoreProviders extends TestWithZookeeper { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class); - static LocalPStoreProvider provider; + static LocalPersistentStoreProvider provider; @Test public void verifyLocalStore() throws Exception { - try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){ + try(LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(DrillConfig.create())){ PStoreTestUtil.test(provider); } } @@ -51,7 +51,7 @@ public class TestPStoreProviders extends TestWithZookeeper { try(CuratorFramework curator = builder.build()){ curator.start(); - ZkPStoreProvider provider = new ZkPStoreProvider(config, curator); + ZookeeperPersistentStoreProvider provider = new ZookeeperPersistentStoreProvider(config, curator); PStoreTestUtil.test(provider); } }
