Never mind it's another commit. Tim
On Tue, Jun 10, 2014 at 9:00 PM, Timothy Chen <tnac...@gmail.com> wrote: > Looks like we added ZK Pstore instead of HBase? > > Tim > > On Tue, Jun 10, 2014 at 8:52 PM, <jacq...@apache.org> wrote: >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java >> >> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java >> new file mode 100644 >> index 0000000..eb21c70 >> --- /dev/null >> +++ >> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java >> @@ -0,0 +1,180 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one >> + * or more contributor license agreements. See the NOTICE file >> + * distributed with this work for additional information >> + * regarding copyright ownership. The ASF licenses this file >> + * to you under the Apache License, Version 2.0 (the >> + * "License"); you may not use this file except in compliance >> + * with the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.drill.exec.store.sys.zk; >> + >> +import java.io.IOException; >> +import java.util.Iterator; >> +import java.util.List; >> +import java.util.Map.Entry; >> + >> +import org.apache.curator.framework.CuratorFramework; >> +import org.apache.drill.exec.store.sys.PStore; >> +import org.apache.drill.exec.store.sys.PStoreConfig; >> +import org.apache.zookeeper.CreateMode; >> + >> +import com.google.common.base.Preconditions; >> + >> +public class ZkPStore<V> implements PStore<V>{ >> + >> + private CuratorFramework framework; >> + private PStoreConfig<V> config; >> + private String prefix; >> + private String parent; >> + >> + ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws >> IOException { >> + this.parent = "/" + config.getName(); >> + this.prefix = parent + "/"; >> + this.framework = framework; >> + this.config = config; >> + >> + // make sure the parent node exists. >> + try{ >> + if(framework.checkExists().forPath(parent) == null) { >> + framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); >> + } >> + }catch(Exception e){ >> + throw new RuntimeException("Failure while accessing Zookeeper", e); >> + } >> + >> + } >> + >> + @Override >> + public Iterator<Entry<String, V>> iterator() { >> + try{ >> + List<String> children = framework.getChildren().forPath(parent); >> + return new Iter(children.iterator()); >> + }catch(Exception e){ >> + throw new RuntimeException("Failure while accessing Zookeeper", e); >> + } >> + >> + } >> + >> + private String p(String key){ >> + Preconditions.checkArgument(!key.contains("/"), "You cannot use keys >> that have slashes in them when using the Zookeeper SystemTable storage >> interface."); >> + return prefix + key; >> + } >> + >> + @Override >> + public V get(String key) { >> + try{ >> + byte[] bytes = framework.getData().forPath(p(key)); >> + if(bytes == null){ >> + return null; >> + } >> + return config.getSerializer().deserialize(bytes); >> + >> + }catch(Exception e){ >> + throw new RuntimeException("Failure while accessing Zookeeper", e); >> + } >> + } >> + >> + @Override >> + public void put(String key, V value) { >> + try{ >> + if(framework.checkExists().forPath(p(key)) != null) { >> + framework.setData().forPath(p(key), >> config.getSerializer().serialize(value)); >> + }else{ >> + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), >> config.getSerializer().serialize(value)); >> + } >> + >> + }catch(Exception e){ >> + throw new RuntimeException("Failure while accessing Zookeeper", e); >> + } >> + >> + } >> + >> + @Override >> + public boolean putIfAbsent(String key, V value) { >> + try{ >> + if(framework.checkExists().forPath(p(key)) != null) { >> + return false; >> + }else{ >> + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), >> config.getSerializer().serialize(value)); >> + return true; >> + } >> + >> + }catch(Exception e){ >> + throw new RuntimeException("Failure while accessing Zookeeper", e); >> + } >> + } >> + >> + @Override >> + public void delete(String key) { >> + try{ >> + framework.delete().forPath(p(key)); >> + }catch(Exception e){ >> + throw new RuntimeException("Failure while accessing Zookeeper", e); >> + } >> + } >> + >> + private class Iter implements Iterator<Entry<String, V>>{ >> + >> + private Iterator<String> keys; >> + private String current; >> + >> + public Iter(Iterator<String> keys) { >> + super(); >> + this.keys = keys; >> + } >> + >> + @Override >> + public boolean hasNext() { >> + return keys.hasNext(); >> + } >> + >> + @Override >> + public Entry<String, V> next() { >> + current = keys.next(); >> + return new DeferredEntry(current); >> + } >> + >> + @Override >> + public void remove() { >> + delete(current); >> + keys.remove(); >> + } >> + >> + private class DeferredEntry implements Entry<String, V>{ >> + >> + private String name; >> + >> + public DeferredEntry(String name) { >> + super(); >> + this.name = name; >> + } >> + >> + @Override >> + public String getKey() { >> + return name; >> + } >> + >> + @Override >> + public V getValue() { >> + return get(name); >> + } >> + >> + @Override >> + public V setValue(V value) { >> + throw new UnsupportedOperationException(); >> + } >> + >> + } >> + >> + } >> + >> +} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java >> >> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java >> new file mode 100644 >> index 0000000..f4513c2 >> --- /dev/null >> +++ >> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java >> @@ -0,0 +1,61 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one >> + * or more contributor license agreements. See the NOTICE file >> + * distributed with this work for additional information >> + * regarding copyright ownership. The ASF licenses this file >> + * to you under the Apache License, Version 2.0 (the >> + * "License"); you may not use this file except in compliance >> + * with the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.drill.exec.store.sys.zk; >> + >> +import java.io.IOException; >> + >> +import org.apache.curator.framework.CuratorFramework; >> +import org.apache.drill.exec.coord.ClusterCoordinator; >> +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; >> +import org.apache.drill.exec.exception.DrillbitStartupException; >> +import org.apache.drill.exec.store.sys.PStore; >> +import org.apache.drill.exec.store.sys.PStoreConfig; >> +import org.apache.drill.exec.store.sys.PStoreProvider; >> +import org.apache.drill.exec.store.sys.PStoreRegistry; >> + >> +public class ZkPStoreProvider implements PStoreProvider{ >> + static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class); >> + >> + private final CuratorFramework curator; >> + >> + public ZkPStoreProvider(PStoreRegistry registry) throws >> DrillbitStartupException { >> + ClusterCoordinator coord = registry.getClusterCoordinator(); >> + if (!(coord instanceof ZKClusterCoordinator)) { >> + throw new DrillbitStartupException("A ZkPStoreProvider was created >> without a ZKClusterCoordinator."); >> + } >> + this.curator = >> ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator(); >> + } >> + >> + public ZkPStoreProvider(CuratorFramework curator) { >> + this.curator = curator; >> + } >> + >> + @Override >> + public void close() { >> + } >> + >> + @Override >> + public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException { >> + return new ZkPStore<V>(curator, store); >> + } >> + >> + @Override >> + public void start() { >> + } >> + >> +} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java >> >> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java >> deleted file mode 100644 >> index e2a6ecf..0000000 >> --- >> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java >> +++ /dev/null >> @@ -1,182 +0,0 @@ >> -/** >> - * Licensed to the Apache Software Foundation (ASF) under one >> - * or more contributor license agreements. See the NOTICE file >> - * distributed with this work for additional information >> - * regarding copyright ownership. The ASF licenses this file >> - * to you under the Apache License, Version 2.0 (the >> - * "License"); you may not use this file except in compliance >> - * with the License. You may obtain a copy of the License at >> - * >> - * http://www.apache.org/licenses/LICENSE-2.0 >> - * >> - * Unless required by applicable law or agreed to in writing, software >> - * distributed under the License is distributed on an "AS IS" BASIS, >> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> - * See the License for the specific language governing permissions and >> - * limitations under the License. >> - */ >> -package org.apache.drill.exec.store.sys.zk; >> - >> -import java.io.IOException; >> -import java.util.Iterator; >> -import java.util.List; >> -import java.util.Map.Entry; >> - >> -import org.apache.curator.framework.CuratorFramework; >> -import org.apache.drill.exec.store.sys.PTable; >> -import org.apache.drill.exec.store.sys.PTableConfig; >> -import org.apache.zookeeper.CreateMode; >> - >> -import com.google.common.base.Preconditions; >> - >> -public class ZkPTable<V> implements PTable<V>{ >> - >> - private CuratorFramework framework; >> - private PTableConfig<V> config; >> - private String prefix; >> - private String parent; >> - >> - ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws >> IOException { >> - super(); >> - this.parent = "/" + config.getName(); >> - this.prefix = parent + "/"; >> - this.framework = framework; >> - this.config = config; >> - >> - // make sure the parent node exists. >> - try{ >> - if(framework.checkExists().forPath(parent) == null) { >> - framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); >> - } >> - }catch(Exception e){ >> - throw new RuntimeException("Failure while accessing Zookeeper", e); >> - } >> - >> - } >> - >> - @Override >> - public Iterator<Entry<String, V>> iterator() { >> - try{ >> - List<String> children = framework.getChildren().forPath(parent); >> - return new Iter(children.iterator()); >> - }catch(Exception e){ >> - throw new RuntimeException("Failure while accessing Zookeeper", e); >> - } >> - >> - } >> - >> - private String p(String key){ >> - Preconditions.checkArgument(!key.contains("/"), "You cannot use keys >> that have slashes in them when using the Zookeeper SystemTable storage >> interface."); >> - return prefix + key; >> - } >> - >> - @Override >> - public V get(String key) { >> - try{ >> - byte[] bytes = framework.getData().forPath(p(key)); >> - if(bytes == null){ >> - return null; >> - } >> - return config.getSerializer().deserialize(bytes); >> - >> - }catch(Exception e){ >> - throw new RuntimeException("Failure while accessing Zookeeper", e); >> - } >> - } >> - >> - @Override >> - public void put(String key, V value) { >> - try{ >> - if(framework.checkExists().forPath(p(key)) != null) { >> - framework.setData().forPath(p(key), >> config.getSerializer().serialize(value)); >> - }else{ >> - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), >> config.getSerializer().serialize(value)); >> - } >> - >> - }catch(Exception e){ >> - throw new RuntimeException("Failure while accessing Zookeeper", e); >> - } >> - >> - } >> - >> - @Override >> - public boolean putIfAbsent(String key, V value) { >> - try{ >> - if(framework.checkExists().forPath(p(key)) != null) { >> - return false; >> - }else{ >> - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), >> config.getSerializer().serialize(value)); >> - return true; >> - } >> - >> - }catch(Exception e){ >> - throw new RuntimeException("Failure while accessing Zookeeper", e); >> - } >> - } >> - >> - @Override >> - public void delete(String key) { >> - try{ >> - framework.delete().forPath(p(key)); >> - }catch(Exception e){ >> - throw new RuntimeException("Failure while accessing Zookeeper", e); >> - } >> - } >> - >> - >> - private class Iter implements Iterator<Entry<String, V>>{ >> - >> - private Iterator<String> keys; >> - private String current; >> - >> - public Iter(Iterator<String> keys) { >> - super(); >> - this.keys = keys; >> - } >> - >> - @Override >> - public boolean hasNext() { >> - return keys.hasNext(); >> - } >> - >> - @Override >> - public Entry<String, V> next() { >> - current = keys.next(); >> - return new DeferredEntry(current); >> - } >> - >> - @Override >> - public void remove() { >> - delete(current); >> - keys.remove(); >> - } >> - >> - >> - private class DeferredEntry implements Entry<String, V>{ >> - >> - private String name; >> - >> - >> - public DeferredEntry(String name) { >> - super(); >> - this.name = name; >> - } >> - >> - @Override >> - public String getKey() { >> - return name; >> - } >> - >> - @Override >> - public V getValue() { >> - return get(name); >> - } >> - >> - @Override >> - public V setValue(V value) { >> - throw new UnsupportedOperationException(); >> - } >> - >> - } >> - } >> -} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java >> >> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java >> deleted file mode 100644 >> index 8d2e153..0000000 >> --- >> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java >> +++ /dev/null >> @@ -1,50 +0,0 @@ >> -/** >> - * Licensed to the Apache Software Foundation (ASF) under one >> - * or more contributor license agreements. See the NOTICE file >> - * distributed with this work for additional information >> - * regarding copyright ownership. The ASF licenses this file >> - * to you under the Apache License, Version 2.0 (the >> - * "License"); you may not use this file except in compliance >> - * with the License. You may obtain a copy of the License at >> - * >> - * http://www.apache.org/licenses/LICENSE-2.0 >> - * >> - * Unless required by applicable law or agreed to in writing, software >> - * distributed under the License is distributed on an "AS IS" BASIS, >> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> - * See the License for the specific language governing permissions and >> - * limitations under the License. >> - */ >> -package org.apache.drill.exec.store.sys.zk; >> - >> -import java.io.IOException; >> - >> -import org.apache.curator.framework.CuratorFramework; >> -import org.apache.drill.exec.store.sys.PTable; >> -import org.apache.drill.exec.store.sys.PTableConfig; >> -import org.apache.drill.exec.store.sys.TableProvider; >> - >> -public class ZkTableProvider implements TableProvider{ >> - static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class); >> - >> - private final CuratorFramework curator; >> - >> - public ZkTableProvider(CuratorFramework curator){ >> - this.curator = curator; >> - } >> - >> - @Override >> - public void close() { >> - } >> - >> - @Override >> - public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException { >> - return new ZkPTable<V>(curator, table); >> - } >> - >> - @Override >> - public void start() { >> - } >> - >> - >> -} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java >> b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java >> index 1eae8c5..71e4e8e 100644 >> --- >> a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java >> +++ >> b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java >> @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler; >> import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl; >> import org.apache.drill.exec.server.BootStrapContext; >> import org.apache.drill.exec.server.DrillbitContext; >> -import org.apache.drill.exec.store.sys.TableProvider; >> +import org.apache.drill.exec.store.sys.PStoreProvider; >> import org.apache.drill.exec.work.batch.ControlHandlerImpl; >> import org.apache.drill.exec.work.batch.ControlMessageHandler; >> import org.apache.drill.exec.work.foreman.Foreman; >> @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{ >> this.dataHandler = new DataResponseHandlerImpl(bee); >> } >> >> - public void start(DrillbitEndpoint endpoint, DistributedCache cache, >> Controller controller, DataConnectionCreator data, ClusterCoordinator coord, >> TableProvider provider){ >> + public void start(DrillbitEndpoint endpoint, DistributedCache cache, >> Controller controller, DataConnectionCreator data, ClusterCoordinator coord, >> PStoreProvider provider){ >> this.dContext = new DrillbitContext(endpoint, bContext, coord, >> controller, data, cache, workBus, provider); >> // executor = >> Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) >> executor = Executors.newCachedThreadPool(new >> NamedThreadFactory("WorkManager-")); >> @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{ >> @Override >> public void close() throws IOException { >> try { >> - executor.awaitTermination(1, TimeUnit.SECONDS); >> + if (executor != null) { >> + executor.awaitTermination(1, TimeUnit.SECONDS); >> + } >> } catch (InterruptedException e) { >> logger.warn("Executor interrupted while awaiting termination"); >> } >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf >> ---------------------------------------------------------------------- >> diff --git a/exec/java-exec/src/main/resources/drill-module.conf >> b/exec/java-exec/src/main/resources/drill-module.conf >> index 6a7c9d5..9ce22c7 100644 >> --- a/exec/java-exec/src/main/resources/drill-module.conf >> +++ b/exec/java-exec/src/main/resources/drill-module.conf >> @@ -94,7 +94,8 @@ drill.exec: { >> affinity.factor: 1.2, >> executor.threads: 4 >> }, >> - sys.tables: { >> + sys.store.provider: { >> + class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider", >> local: { >> path: "/tmp/drill", >> write: true >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java >> ---------------------------------------------------------------------- >> diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java >> b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java >> index 547af34..ad114ab 100644 >> --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java >> +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java >> @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager; >> import org.apache.drill.exec.server.options.SessionOptionManager; >> import org.apache.drill.exec.server.options.SystemOptionManager; >> import org.apache.drill.exec.store.StoragePluginRegistry; >> -import org.apache.drill.exec.store.sys.local.LocalTableProvider; >> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; >> import org.junit.Rule; >> import org.junit.rules.TestRule; >> >> @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{ >> final DistributedCache cache = new LocalCache(); >> cache.run(); >> >> - final LocalTableProvider provider = new LocalTableProvider(config); >> + final LocalPStoreProvider provider = new LocalPStoreProvider(config); >> provider.start(); >> >> final SystemOptionManager opt = new SystemOptionManager(config, >> provider); >> @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{ >> result = opt; >> dbContext.getCache(); >> result = cache; >> - dbContext.getSystemTableProvider(); >> + dbContext.getPersistentStoreProvider(); >> result = provider; >> } >> }; >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java >> >> b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java >> index 8fc37f3..199ecfc 100644 >> --- >> a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java >> +++ >> b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java >> @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit; >> import org.apache.drill.exec.server.DrillbitContext; >> import org.apache.drill.exec.server.RemoteServiceSet; >> import org.apache.drill.exec.store.StoragePluginRegistry; >> -import org.apache.drill.exec.store.sys.local.LocalTableProvider; >> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; >> import org.apache.drill.exec.vector.ValueVector; >> import org.apache.drill.exec.vector.VarBinaryVector; >> import org.junit.AfterClass; >> @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest { >> } >> }; >> RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet(); >> - DrillbitContext bitContext = new >> DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, >> controller, com, cache, workBus, new >> LocalTableProvider(DrillConfig.create())); >> + DrillbitContext bitContext = new >> DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, >> controller, com, cache, workBus, new >> LocalPStoreProvider(DrillConfig.create())); >> QueryContext qc = new QueryContext(new UserSession(null, null, null), >> QueryId.getDefaultInstance(), bitContext); >> PhysicalPlanReader reader = bitContext.getPlanReader(); >> LogicalPlan plan = >> reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), >> Charsets.UTF_8)); >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java >> >> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java >> new file mode 100644 >> index 0000000..6f7794b >> --- /dev/null >> +++ >> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java >> @@ -0,0 +1,66 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one >> + * or more contributor license agreements. See the NOTICE file >> + * distributed with this work for additional information >> + * regarding copyright ownership. The ASF licenses this file >> + * to you under the Apache License, Version 2.0 (the >> + * "License"); you may not use this file except in compliance >> + * with the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.drill.exec.store.sys; >> + >> +import static org.junit.Assert.assertEquals; >> +import static org.junit.Assert.assertFalse; >> +import static org.junit.Assert.assertTrue; >> + >> +import java.util.Iterator; >> +import java.util.Map; >> +import java.util.Map.Entry; >> + >> +import com.fasterxml.jackson.databind.ObjectMapper; >> +import com.google.common.collect.Maps; >> + >> +public class PStoreTestUtil { >> + static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class); >> + >> + public static void test(PStoreProvider provider) throws Exception{ >> + PStore<String> store = >> provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), >> String.class).name("sys.test").build()); >> + String[] keys = {"first", "second"}; >> + String[] values = {"value1", "value2"}; >> + Map<String, String> expectedMap = Maps.newHashMap(); >> + >> + for(int i =0; i < keys.length; i++){ >> + expectedMap.put(keys[i], values[i]); >> + store.put(keys[i], values[i]); >> + } >> + >> + { >> + Iterator<Map.Entry<String, String>> iter = store.iterator(); >> + for(int i =0; i < keys.length; i++){ >> + Entry<String, String> e = iter.next(); >> + assertTrue(expectedMap.containsKey(e.getKey())); >> + assertEquals(expectedMap.get(e.getKey()), e.getValue()); >> + } >> + >> + assertFalse(iter.hasNext()); >> + } >> + >> + { >> + Iterator<Map.Entry<String, String>> iter = store.iterator(); >> + while(iter.hasNext()){ >> + iter.next(); >> + iter.remove(); >> + } >> + } >> + >> + assertFalse(store.iterator().hasNext()); >> + } >> +} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java >> >> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java >> deleted file mode 100644 >> index 47a783b..0000000 >> --- >> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java >> +++ /dev/null >> @@ -1,66 +0,0 @@ >> -/** >> - * Licensed to the Apache Software Foundation (ASF) under one >> - * or more contributor license agreements. See the NOTICE file >> - * distributed with this work for additional information >> - * regarding copyright ownership. The ASF licenses this file >> - * to you under the Apache License, Version 2.0 (the >> - * "License"); you may not use this file except in compliance >> - * with the License. You may obtain a copy of the License at >> - * >> - * http://www.apache.org/licenses/LICENSE-2.0 >> - * >> - * Unless required by applicable law or agreed to in writing, software >> - * distributed under the License is distributed on an "AS IS" BASIS, >> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> - * See the License for the specific language governing permissions and >> - * limitations under the License. >> - */ >> -package org.apache.drill.exec.store.sys; >> - >> -import static org.junit.Assert.assertEquals; >> -import static org.junit.Assert.assertFalse; >> -import static org.junit.Assert.assertTrue; >> - >> -import java.util.Iterator; >> -import java.util.Map; >> -import java.util.Map.Entry; >> - >> -import com.fasterxml.jackson.databind.ObjectMapper; >> -import com.google.common.collect.Maps; >> - >> -public class PTableTestUtil { >> - static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class); >> - >> - public static void test(TableProvider provider) throws Exception{ >> - PTable<String> table = >> provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), >> String.class).name("sys.test").build()); >> - String[] keys = {"first", "second"}; >> - String[] values = {"value1", "value2"}; >> - Map<String, String> expectedMap = Maps.newHashMap(); >> - >> - for(int i =0; i < keys.length; i++){ >> - expectedMap.put(keys[i], values[i]); >> - table.put(keys[i], values[i]); >> - } >> - >> - { >> - Iterator<Map.Entry<String, String>> iter = table.iterator(); >> - for(int i =0; i < keys.length; i++){ >> - Entry<String, String> e = iter.next(); >> - assertTrue(expectedMap.containsKey(e.getKey())); >> - assertEquals(expectedMap.get(e.getKey()), e.getValue()); >> - } >> - >> - assertFalse(iter.hasNext()); >> - } >> - >> - { >> - Iterator<Map.Entry<String, String>> iter = table.iterator(); >> - while(iter.hasNext()){ >> - iter.next(); >> - iter.remove(); >> - } >> - } >> - >> - assertFalse(table.iterator().hasNext()); >> - } >> -} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java >> >> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java >> new file mode 100644 >> index 0000000..18d87c7 >> --- /dev/null >> +++ >> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java >> @@ -0,0 +1,58 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one >> + * or more contributor license agreements. See the NOTICE file >> + * distributed with this work for additional information >> + * regarding copyright ownership. The ASF licenses this file >> + * to you under the Apache License, Version 2.0 (the >> + * "License"); you may not use this file except in compliance >> + * with the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.drill.exec.store.sys; >> + >> +import org.apache.curator.framework.CuratorFramework; >> +import org.apache.curator.framework.CuratorFrameworkFactory; >> +import org.apache.curator.retry.RetryNTimes; >> +import org.apache.drill.common.config.DrillConfig; >> +import org.apache.drill.exec.ExecConstants; >> +import org.apache.drill.exec.TestWithZookeeper; >> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; >> +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider; >> +import org.junit.Test; >> + >> +public class TestPStoreProviders extends TestWithZookeeper { >> + static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class); >> + >> + static LocalPStoreProvider provider; >> + >> + @Test >> + public void verifyLocalStore() throws Exception { >> + try(LocalPStoreProvider provider = new >> LocalPStoreProvider(DrillConfig.create())){ >> + PStoreTestUtil.test(provider); >> + } >> + } >> + >> + @Test >> + public void verifyZkStore() throws Exception { >> + DrillConfig config = getConfig(); >> + String connect = config.getString(ExecConstants.ZK_CONNECTION); >> + CuratorFrameworkFactory.Builder builder = >> CuratorFrameworkFactory.builder() >> + .namespace(config.getString(ExecConstants.ZK_ROOT)) >> + .retryPolicy(new RetryNTimes(1, 100)) >> + .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) >> + .connectString(connect); >> + >> + try(CuratorFramework curator = builder.build()){ >> + curator.start(); >> + ZkPStoreProvider provider = new ZkPStoreProvider(curator); >> + PStoreTestUtil.test(provider); >> + } >> + } >> +} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java >> ---------------------------------------------------------------------- >> diff --git >> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java >> >> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java >> deleted file mode 100644 >> index b7d92fe..0000000 >> --- >> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java >> +++ /dev/null >> @@ -1,58 +0,0 @@ >> -/** >> - * Licensed to the Apache Software Foundation (ASF) under one >> - * or more contributor license agreements. See the NOTICE file >> - * distributed with this work for additional information >> - * regarding copyright ownership. The ASF licenses this file >> - * to you under the Apache License, Version 2.0 (the >> - * "License"); you may not use this file except in compliance >> - * with the License. You may obtain a copy of the License at >> - * >> - * http://www.apache.org/licenses/LICENSE-2.0 >> - * >> - * Unless required by applicable law or agreed to in writing, software >> - * distributed under the License is distributed on an "AS IS" BASIS, >> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> - * See the License for the specific language governing permissions and >> - * limitations under the License. >> - */ >> -package org.apache.drill.exec.store.sys; >> - >> -import org.apache.curator.framework.CuratorFramework; >> -import org.apache.curator.framework.CuratorFrameworkFactory; >> -import org.apache.curator.retry.RetryNTimes; >> -import org.apache.drill.common.config.DrillConfig; >> -import org.apache.drill.exec.ExecConstants; >> -import org.apache.drill.exec.TestWithZookeeper; >> -import org.apache.drill.exec.store.sys.local.LocalTableProvider; >> -import org.apache.drill.exec.store.sys.zk.ZkTableProvider; >> -import org.junit.Test; >> - >> -public class TestTableProviders extends TestWithZookeeper { >> - static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(TestTableProviders.class); >> - >> - static LocalTableProvider provider; >> - >> - @Test >> - public void verifyLocalTable() throws Exception { >> - try(LocalTableProvider provider = new >> LocalTableProvider(DrillConfig.create())){ >> - PTableTestUtil.test(provider); >> - } >> - } >> - >> - @Test >> - public void verifyZkTable() throws Exception { >> - DrillConfig config = getConfig(); >> - String connect = config.getString(ExecConstants.ZK_CONNECTION); >> - CuratorFrameworkFactory.Builder builder = >> CuratorFrameworkFactory.builder() >> - .namespace(config.getString(ExecConstants.ZK_ROOT)) >> - .retryPolicy(new RetryNTimes(1, 100)) >> - .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) >> - .connectString(connect); >> - >> - try(CuratorFramework curator = builder.build()){ >> - curator.start(); >> - ZkTableProvider provider = new ZkTableProvider(curator); >> - PTableTestUtil.test(provider); >> - } >> - } >> -} >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml >> ---------------------------------------------------------------------- >> diff --git a/pom.xml b/pom.xml >> index b2289f3..500f0fd 100644 >> --- a/pom.xml >> +++ b/pom.xml >> @@ -262,7 +262,7 @@ >> <artifactId>maven-surefire-plugin</artifactId> >> <version>2.17</version> >> <configuration> >> - <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false >> -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M >> -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> >> + <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false >> -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M >> -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> >> <forkCount>4</forkCount> >> <reuseForks>true</reuseForks> >> <additionalClasspathElements> >>