http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java new file mode 100644 index 0000000..eb21c70 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.zk; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.exec.store.sys.PStore; +import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.zookeeper.CreateMode; + +import com.google.common.base.Preconditions; + +public class ZkPStore<V> implements PStore<V>{ + + private CuratorFramework framework; + private PStoreConfig<V> config; + private String prefix; + private String parent; + + ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { + this.parent = "/" + config.getName(); + this.prefix = parent + "/"; + this.framework = framework; + this.config = config; + + // make sure the parent node exists. + try{ + if(framework.checkExists().forPath(parent) == null) { + framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); + } + }catch(Exception e){ + throw new RuntimeException("Failure while accessing Zookeeper", e); + } + + } + + @Override + public Iterator<Entry<String, V>> iterator() { + try{ + List<String> children = framework.getChildren().forPath(parent); + return new Iter(children.iterator()); + }catch(Exception e){ + throw new RuntimeException("Failure while accessing Zookeeper", e); + } + + } + + private String p(String key){ + Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface."); + return prefix + key; + } + + @Override + public V get(String key) { + try{ + byte[] bytes = framework.getData().forPath(p(key)); + if(bytes == null){ + return null; + } + return config.getSerializer().deserialize(bytes); + + }catch(Exception e){ + throw new RuntimeException("Failure while accessing Zookeeper", e); + } + } + + @Override + public void put(String key, V value) { + try{ + if(framework.checkExists().forPath(p(key)) != null) { + framework.setData().forPath(p(key), config.getSerializer().serialize(value)); + }else{ + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); + } + + }catch(Exception e){ + throw new RuntimeException("Failure while accessing Zookeeper", e); + } + + } + + @Override + public boolean putIfAbsent(String key, V value) { + try{ + if(framework.checkExists().forPath(p(key)) != null) { + return false; + }else{ + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); + return true; + } + + }catch(Exception e){ + throw new RuntimeException("Failure while accessing Zookeeper", e); + } + } + + @Override + public void delete(String key) { + try{ + framework.delete().forPath(p(key)); + }catch(Exception e){ + throw new RuntimeException("Failure while accessing Zookeeper", e); + } + } + + private class Iter implements Iterator<Entry<String, V>>{ + + private Iterator<String> keys; + private String current; + + public Iter(Iterator<String> keys) { + super(); + this.keys = keys; + } + + @Override + public boolean hasNext() { + return keys.hasNext(); + } + + @Override + public Entry<String, V> next() { + current = keys.next(); + return new DeferredEntry(current); + } + + @Override + public void remove() { + delete(current); + keys.remove(); + } + + private class DeferredEntry implements Entry<String, V>{ + + private String name; + + public DeferredEntry(String name) { + super(); + this.name = name; + } + + @Override + public String getKey() { + return name; + } + + @Override + public V getValue() { + return get(name); + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + + } + + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java new file mode 100644 index 0000000..f4513c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.zk; + +import java.io.IOException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; +import org.apache.drill.exec.exception.DrillbitStartupException; +import org.apache.drill.exec.store.sys.PStore; +import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.store.sys.PStoreRegistry; + +public class ZkPStoreProvider implements PStoreProvider{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class); + + private final CuratorFramework curator; + + public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException { + ClusterCoordinator coord = registry.getClusterCoordinator(); + if (!(coord instanceof ZKClusterCoordinator)) { + throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator."); + } + this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator(); + } + + public ZkPStoreProvider(CuratorFramework curator) { + this.curator = curator; + } + + @Override + public void close() { + } + + @Override + public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException { + return new ZkPStore<V>(curator, store); + } + + @Override + public void start() { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java deleted file mode 100644 index e2a6ecf..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.zk; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.exec.store.sys.PTable; -import org.apache.drill.exec.store.sys.PTableConfig; -import org.apache.zookeeper.CreateMode; - -import com.google.common.base.Preconditions; - -public class ZkPTable<V> implements PTable<V>{ - - private CuratorFramework framework; - private PTableConfig<V> config; - private String prefix; - private String parent; - - ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws IOException { - super(); - this.parent = "/" + config.getName(); - this.prefix = parent + "/"; - this.framework = framework; - this.config = config; - - // make sure the parent node exists. - try{ - if(framework.checkExists().forPath(parent) == null) { - framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); - } - }catch(Exception e){ - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - - } - - @Override - public Iterator<Entry<String, V>> iterator() { - try{ - List<String> children = framework.getChildren().forPath(parent); - return new Iter(children.iterator()); - }catch(Exception e){ - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - - } - - private String p(String key){ - Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface."); - return prefix + key; - } - - @Override - public V get(String key) { - try{ - byte[] bytes = framework.getData().forPath(p(key)); - if(bytes == null){ - return null; - } - return config.getSerializer().deserialize(bytes); - - }catch(Exception e){ - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - } - - @Override - public void put(String key, V value) { - try{ - if(framework.checkExists().forPath(p(key)) != null) { - framework.setData().forPath(p(key), config.getSerializer().serialize(value)); - }else{ - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); - } - - }catch(Exception e){ - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - - } - - @Override - public boolean putIfAbsent(String key, V value) { - try{ - if(framework.checkExists().forPath(p(key)) != null) { - return false; - }else{ - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); - return true; - } - - }catch(Exception e){ - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - } - - @Override - public void delete(String key) { - try{ - framework.delete().forPath(p(key)); - }catch(Exception e){ - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - } - - - private class Iter implements Iterator<Entry<String, V>>{ - - private Iterator<String> keys; - private String current; - - public Iter(Iterator<String> keys) { - super(); - this.keys = keys; - } - - @Override - public boolean hasNext() { - return keys.hasNext(); - } - - @Override - public Entry<String, V> next() { - current = keys.next(); - return new DeferredEntry(current); - } - - @Override - public void remove() { - delete(current); - keys.remove(); - } - - - private class DeferredEntry implements Entry<String, V>{ - - private String name; - - - public DeferredEntry(String name) { - super(); - this.name = name; - } - - @Override - public String getKey() { - return name; - } - - @Override - public V getValue() { - return get(name); - } - - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); - } - - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java deleted file mode 100644 index 8d2e153..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.zk; - -import java.io.IOException; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.exec.store.sys.PTable; -import org.apache.drill.exec.store.sys.PTableConfig; -import org.apache.drill.exec.store.sys.TableProvider; - -public class ZkTableProvider implements TableProvider{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class); - - private final CuratorFramework curator; - - public ZkTableProvider(CuratorFramework curator){ - this.curator = curator; - } - - @Override - public void close() { - } - - @Override - public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException { - return new ZkPTable<V>(curator, table); - } - - @Override - public void start() { - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 1eae8c5..71e4e8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler; import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.sys.TableProvider; +import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.work.batch.ControlHandlerImpl; import org.apache.drill.exec.work.batch.ControlMessageHandler; import org.apache.drill.exec.work.foreman.Foreman; @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{ this.dataHandler = new DataResponseHandlerImpl(bee); } - public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, TableProvider provider){ + public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){ this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider); // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-")); @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{ @Override public void close() throws IOException { try { - executor.awaitTermination(1, TimeUnit.SECONDS); + if (executor != null) { + executor.awaitTermination(1, TimeUnit.SECONDS); + } } catch (InterruptedException e) { logger.warn("Executor interrupted while awaiting termination"); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 6a7c9d5..9ce22c7 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -94,7 +94,8 @@ drill.exec: { affinity.factor: 1.2, executor.threads: 4 }, - sys.tables: { + sys.store.provider: { + class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider", local: { path: "/tmp/drill", write: true http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index 547af34..ad114ab 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.sys.local.LocalTableProvider; +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; import org.junit.Rule; import org.junit.rules.TestRule; @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{ final DistributedCache cache = new LocalCache(); cache.run(); - final LocalTableProvider provider = new LocalTableProvider(config); + final LocalPStoreProvider provider = new LocalPStoreProvider(config); provider.start(); final SystemOptionManager opt = new SystemOptionManager(config, provider); @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{ result = opt; dbContext.getCache(); result = cache; - dbContext.getSystemTableProvider(); + dbContext.getPersistentStoreProvider(); result = provider; } }; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java index 8fc37f3..199ecfc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.sys.local.LocalTableProvider; +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.junit.AfterClass; @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest { } }; RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet(); - DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalTableProvider(DrillConfig.create())); + DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create())); QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext); PhysicalPlanReader reader = bitContext.getPlanReader(); LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java new file mode 100644 index 0000000..6f7794b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; + +public class PStoreTestUtil { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class); + + public static void test(PStoreProvider provider) throws Exception{ + PStore<String> store = provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build()); + String[] keys = {"first", "second"}; + String[] values = {"value1", "value2"}; + Map<String, String> expectedMap = Maps.newHashMap(); + + for(int i =0; i < keys.length; i++){ + expectedMap.put(keys[i], values[i]); + store.put(keys[i], values[i]); + } + + { + Iterator<Map.Entry<String, String>> iter = store.iterator(); + for(int i =0; i < keys.length; i++){ + Entry<String, String> e = iter.next(); + assertTrue(expectedMap.containsKey(e.getKey())); + assertEquals(expectedMap.get(e.getKey()), e.getValue()); + } + + assertFalse(iter.hasNext()); + } + + { + Iterator<Map.Entry<String, String>> iter = store.iterator(); + while(iter.hasNext()){ + iter.next(); + iter.remove(); + } + } + + assertFalse(store.iterator().hasNext()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java deleted file mode 100644 index 47a783b..0000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; - -public class PTableTestUtil { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class); - - public static void test(TableProvider provider) throws Exception{ - PTable<String> table = provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build()); - String[] keys = {"first", "second"}; - String[] values = {"value1", "value2"}; - Map<String, String> expectedMap = Maps.newHashMap(); - - for(int i =0; i < keys.length; i++){ - expectedMap.put(keys[i], values[i]); - table.put(keys[i], values[i]); - } - - { - Iterator<Map.Entry<String, String>> iter = table.iterator(); - for(int i =0; i < keys.length; i++){ - Entry<String, String> e = iter.next(); - assertTrue(expectedMap.containsKey(e.getKey())); - assertEquals(expectedMap.get(e.getKey()), e.getValue()); - } - - assertFalse(iter.hasNext()); - } - - { - Iterator<Map.Entry<String, String>> iter = table.iterator(); - while(iter.hasNext()){ - iter.next(); - iter.remove(); - } - } - - assertFalse(table.iterator().hasNext()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java new file mode 100644 index 0000000..18d87c7 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.TestWithZookeeper; +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider; +import org.junit.Test; + +public class TestPStoreProviders extends TestWithZookeeper { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class); + + static LocalPStoreProvider provider; + + @Test + public void verifyLocalStore() throws Exception { + try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){ + PStoreTestUtil.test(provider); + } + } + + @Test + public void verifyZkStore() throws Exception { + DrillConfig config = getConfig(); + String connect = config.getString(ExecConstants.ZK_CONNECTION); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .namespace(config.getString(ExecConstants.ZK_ROOT)) + .retryPolicy(new RetryNTimes(1, 100)) + .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) + .connectString(connect); + + try(CuratorFramework curator = builder.build()){ + curator.start(); + ZkPStoreProvider provider = new ZkPStoreProvider(curator); + PStoreTestUtil.test(provider); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java deleted file mode 100644 index b7d92fe..0000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.TestWithZookeeper; -import org.apache.drill.exec.store.sys.local.LocalTableProvider; -import org.apache.drill.exec.store.sys.zk.ZkTableProvider; -import org.junit.Test; - -public class TestTableProviders extends TestWithZookeeper { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTableProviders.class); - - static LocalTableProvider provider; - - @Test - public void verifyLocalTable() throws Exception { - try(LocalTableProvider provider = new LocalTableProvider(DrillConfig.create())){ - PTableTestUtil.test(provider); - } - } - - @Test - public void verifyZkTable() throws Exception { - DrillConfig config = getConfig(); - String connect = config.getString(ExecConstants.ZK_CONNECTION); - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() - .namespace(config.getString(ExecConstants.ZK_ROOT)) - .retryPolicy(new RetryNTimes(1, 100)) - .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) - .connectString(connect); - - try(CuratorFramework curator = builder.build()){ - curator.start(); - ZkTableProvider provider = new ZkTableProvider(curator); - PTableTestUtil.test(provider); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b2289f3..500f0fd 100644 --- a/pom.xml +++ b/pom.xml @@ -262,7 +262,7 @@ <artifactId>maven-surefire-plugin</artifactId> <version>2.17</version> <configuration> - <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> + <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> <forkCount>4</forkCount> <reuseForks>true</reuseForks> <additionalClasspathElements>
