DRILL-1684, DRILL-1517, DRILL-1350: Profile and cancellation updates - Remove any storage of persisted profiles. - Store a separate query info object for active queries. - Update cancellation and running profile loading to query foreman server. - Make file store support HDFS APIs - Update PStoreProvider to use configuration to decide if you want PERSISTENT, EPHEMERAL, or BLOB storage rather than separate interfaces. - Update ZkPStore's persistent mode to leverage a cache and respond to changes rather than actively probing values. - Update ZkPStore's cache to be effectively write-through. - Automatically delete deprecated or default value options from PStore.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2eb72a7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2eb72a7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2eb72a7c Branch: refs/heads/master Commit: 2eb72a7c238fe960b61edbae0d3dea1c836c746f Parents: 90c12c8 Author: Jacques Nadeau <jacq...@apache.org> Authored: Sun Nov 9 15:16:52 2014 -0800 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed Nov 19 21:35:39 2014 -0800 ---------------------------------------------------------------------- .../exec/store/hbase/config/HBasePStore.java | 17 +- .../store/hbase/config/HBasePStoreProvider.java | 45 +- .../drill/hbase/TestHBaseTableProvider.java | 4 +- .../exec/store/mongo/config/MongoPStore.java | 10 - .../store/mongo/config/MongoPStoreProvider.java | 16 +- .../exec/rpc/control/ControlRpcConfig.java | 1 + .../drill/exec/rpc/control/ControlTunnel.java | 20 + .../drill/exec/rpc/data/DataRpcConfig.java | 1 + .../server/options/FallbackOptionManager.java | 97 ++ .../server/options/FragmentOptionManager.java | 50 +- .../server/options/InMemoryOptionManager.java | 50 + .../exec/server/options/QueryOptionManager.java | 72 +- .../server/options/SessionOptionManager.java | 71 +- .../server/options/SystemOptionManager.java | 79 +- .../drill/exec/store/StoragePluginRegistry.java | 2 +- .../exec/store/dfs/WorkspaceSchemaFactory.java | 3 +- .../org/apache/drill/exec/store/sys/EStore.java | 6 +- .../drill/exec/store/sys/EStoreProvider.java | 2 +- .../org/apache/drill/exec/store/sys/PStore.java | 6 +- .../drill/exec/store/sys/PStoreConfig.java | 47 +- .../drill/exec/store/sys/PStoreProvider.java | 4 +- .../drill/exec/store/sys/local/FilePStore.java | 231 ++++ .../store/sys/local/LocalEStoreProvider.java | 13 +- .../drill/exec/store/sys/local/LocalPStore.java | 208 ---- .../store/sys/local/LocalPStoreProvider.java | 47 +- .../drill/exec/store/sys/local/MapEStore.java | 6 + .../store/sys/local/NoWriteLocalPStore.java | 10 - .../exec/store/sys/zk/ZkAbstractStore.java | 89 +- .../exec/store/sys/zk/ZkEStoreProvider.java | 6 +- .../drill/exec/store/sys/zk/ZkPStore.java | 56 +- .../exec/store/sys/zk/ZkPStoreProvider.java | 61 +- .../exec/work/batch/ControlHandlerImpl.java | 18 +- .../apache/drill/exec/work/foreman/Foreman.java | 2 +- .../drill/exec/work/foreman/QueryStatus.java | 73 +- .../apache/drill/exec/work/user/UserWorker.java | 10 +- .../drill/exec/store/sys/PStoreTestUtil.java | 6 +- .../exec/store/sys/TestPStoreProviders.java | 2 +- .../org/apache/drill/exec/proto/BitControl.java | 29 +- .../drill/exec/proto/SchemaUserBitShared.java | 141 +++ .../apache/drill/exec/proto/UserBitShared.java | 1125 +++++++++++++++++- .../drill/exec/proto/beans/QueryInfo.java | 253 ++++ protocol/src/main/protobuf/BitControl.proto | 1 + protocol/src/main/protobuf/UserBitShared.proto | 7 + 43 files changed, 2257 insertions(+), 740 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java index b5a697c..59a3125 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.hbase.config; import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY; -import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY_BLOB; import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.QUALIFIER; import java.io.IOException; @@ -63,16 +62,15 @@ public class HBasePStore<V> implements PStore<V> { return get(key, FAMILY); } - @Override - public V getBlob(String key) { - return get(key, FAMILY_BLOB); - } - protected synchronized V get(String key, byte[] family) { try { Get get = new Get(row(key)); get.addColumn(family, QUALIFIER); - return value(table.get(get)); + Result r = table.get(get); + if(r.isEmpty()){ + return null; + } + return value(r); } catch (IOException e) { throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); } @@ -83,11 +81,6 @@ public class HBasePStore<V> implements PStore<V> { put(key, FAMILY, value); } - @Override - public void putBlob(String key, V value) { - put(key, FAMILY_BLOB, value); - } - protected synchronized void put(String key, byte[] family, V value) { try { Put put = new Put(row(key)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java index 52808d4..b3947b4 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java @@ -19,22 +19,16 @@ package org.apache.drill.exec.store.hbase.config; import java.io.IOException; import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import com.google.common.collect.Maps; -import org.apache.curator.framework.CuratorFramework; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.store.hbase.DrillHBaseConstants; -import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.store.sys.PStoreRegistry; import org.apache.drill.exec.store.sys.local.LocalEStoreProvider; -import org.apache.drill.exec.store.sys.local.MapEStore; -import org.apache.drill.exec.store.sys.zk.ZkEStore; import org.apache.drill.exec.store.sys.zk.ZkEStoreProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -54,8 +48,6 @@ public class HBasePStoreProvider implements PStoreProvider { static final byte[] FAMILY = Bytes.toBytes("s"); - static final byte[] FAMILY_BLOB = Bytes.toBytes("t"); - static final byte[] QUALIFIER = Bytes.toBytes("d"); private final String storeTableName; @@ -104,11 +96,28 @@ public class HBasePStoreProvider implements PStoreProvider { this.zkAvailable = false; } + + @Override - public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException { - return new HBasePStore<V>(store, this.table); + public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException { + switch(config.getMode()){ + case EPHEMERAL: + if (this.zkAvailable) { + return zkEStoreProvider.getStore(config); + } else { + return localEStoreProvider.getStore(config); + } + + case BLOB_PERSISTENT: + case PERSISTENT: + return new HBasePStore<V>(config, this.table); + + default: + throw new IllegalStateException(); + } } + @Override public void start() throws IOException { this.connection = HConnectionManager.createConnection(hbaseConf); @@ -117,14 +126,13 @@ public class HBasePStoreProvider implements PStoreProvider { if (!admin.tableExists(storeTableName)) { HTableDescriptor desc = new HTableDescriptor(storeTableName); desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); - desc.addFamily(new HColumnDescriptor(FAMILY_BLOB).setMaxVersions(1)); admin.createTable(desc); } else { HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName)); - if (!desc.hasFamily(FAMILY) || !desc.hasFamily(FAMILY_BLOB)) { + if (!desc.hasFamily(FAMILY)) { throw new DrillRuntimeException("The HBase table " + storeTableName + " specified as persistent store exists but does not contain column family: " - + (desc.hasFamily(FAMILY) ? Bytes.toString(FAMILY_BLOB) : Bytes.toString(FAMILY))); + + (Bytes.toString(FAMILY))); } } } @@ -134,17 +142,6 @@ public class HBasePStoreProvider implements PStoreProvider { } @Override - public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException { - // when ZK is available, use ZK as the Ephemeral store. - // when ZK is not available, use a Map as the Ephemeral store. - if (this.zkAvailable) { - return zkEStoreProvider.getEStore(store); - } else { - return localEStoreProvider.getEStore(store); - } - } - - @Override public synchronized void close() { if (this.table != null) { try { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index b1ff44c..5f2d6c7 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -41,7 +41,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest { @Test public void testTableProvider() throws IOException { - PStore<String> hbaseStore = provider.getPStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase").build()); + PStore<String> hbaseStore = provider.getStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase").build()); hbaseStore.put("", "v0"); hbaseStore.put("k1", "v1"); hbaseStore.put("k2", "v2"); @@ -60,7 +60,7 @@ public class TestHBaseTableProvider extends BaseHBaseTest { } assertEquals(7, rowCount); - PStore<String> hbaseTestStore = provider.getPStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase.test").build()); + PStore<String> hbaseTestStore = provider.getStore(PStoreConfig.newJacksonBuilder(config.getMapper(), String.class).name("hbase.test").build()); hbaseTestStore.put("", "v0"); hbaseTestStore.put("k1", "v1"); hbaseTestStore.put("k2", "v2"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java index f256c98..fc5c05b 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java @@ -69,11 +69,6 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants { } @Override - public V getBlob(String key) { - throw new UnsupportedOperationException("Mongo DB PStore not currently supported"); - } - - @Override public void put(String key, V value) { try { DBObject putObj = new BasicDBObject(2); @@ -87,11 +82,6 @@ public class MongoPStore<V> implements PStore<V>, DrillMongoConstants { } @Override - public void putBlob(String key, V value) { - throw new UnsupportedOperationException("Mongo DB PStore not currently supported"); - } - - @Override public boolean putIfAbsent(String key, V value) { try { DBObject check = new BasicDBObject(1).append(ID, key); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java index eb4ba53..7443c2e 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java @@ -68,13 +68,17 @@ public class MongoPStoreProvider implements PStoreProvider, DrillMongoConstants } @Override - public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException { - return localEStoreProvider.getEStore(storeConfig); - } + public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException { + switch(config.getMode()){ + case BLOB_PERSISTENT: + case PERSISTENT: + return new MongoPStore<>(config, collection); + case EPHEMERAL: + return localEStoreProvider.getStore(config); + default: + throw new IllegalStateException(); - @Override - public <V> PStore<V> getPStore(PStoreConfig<V> config) throws IOException { - return new MongoPStore<>(config, collection); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java index 1308c37..37730e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java @@ -38,6 +38,7 @@ public class ControlRpcConfig { .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class) .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class) + .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index 461cd8a..a4f9fdf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -60,6 +60,12 @@ public class ControlTunnel { manager.runCommand(b); } + public DrillRpcFuture<Ack> requestCancelQuery(QueryId queryId){ + CancelQuery c = new CancelQuery(queryId); + manager.runCommand(c); + return c.getFuture(); + } + public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, FinishedReceiver finishedReceiver){ ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver); manager.runCommand(b); @@ -151,4 +157,18 @@ public class ControlTunnel { connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class); } } + + public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection> { + final QueryId queryId; + + public CancelQuery(QueryId queryId) { + super(); + this.queryId = queryId; + } + + @Override + public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { + connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java index 75fa17c..b54841d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java @@ -38,4 +38,5 @@ public class DataRpcConfig { public static int RPC_VERSION = 3; public static final Response OK = new Response(RpcType.ACK, Acks.OK); + public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java new file mode 100644 index 0000000..1a5e137 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server.options; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.drill.exec.server.options.OptionValue.OptionType; +import org.eigenbase.sql.SqlLiteral; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +public abstract class FallbackOptionManager implements OptionManager{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FallbackOptionManager.class); + + private Map<String, OptionValue> options = Maps.newConcurrentMap(); + private OptionManager fallback; + + public FallbackOptionManager(OptionManager fallback) { + super(); + this.fallback = fallback; + } + + @Override + public Iterator<OptionValue> iterator() { + return Iterables.concat(fallback, options.values()).iterator(); + } + + @Override + public OptionValue getOption(String name) { + OptionValue opt = getLocalOption(name); + if(opt == null && fallback != null){ + return fallback.getOption(name); + }else{ + return opt; + } + } + + abstract OptionValue getLocalOption(String name); + abstract boolean setLocalOption(OptionValue value); + + @Override + public void setOption(OptionValue value) { + fallback.getAdmin().validate(value); + setValidatedOption(value); + } + + @Override + public void setOption(String name, SqlLiteral literal, OptionType type) { + OptionValue val = getAdmin().validate(name, literal); + val.type = type; + setValidatedOption(val); + } + + private void setValidatedOption(OptionValue value) { + if (!setLocalOption(value)) { + fallback.setOption(value); + } + } + + + @Override + public OptionAdmin getAdmin() { + return fallback.getAdmin(); + } + + @Override + public OptionManager getSystemManager() { + return fallback.getSystemManager(); + } + + @Override + public OptionList getOptionList() { + OptionList list = new OptionList(); + for (OptionValue o : options.values()) { + list.add(o); + } + return list; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java index 46d316b..e4dbbf8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java @@ -17,67 +17,31 @@ */ package org.apache.drill.exec.server.options; -import java.util.Iterator; import java.util.Map; -import org.eigenbase.sql.SqlLiteral; - import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -public class FragmentOptionManager implements OptionManager { +public class FragmentOptionManager extends InMemoryOptionManager { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class); - ImmutableMap<String, OptionValue> options; - OptionManager systemOptions; - public FragmentOptionManager(OptionManager systemOptions, OptionList options) { + super(systemOptions, getMapFromOptionList(options)); + } + + private static Map<String, OptionValue> getMapFromOptionList(OptionList options){ Map<String, OptionValue> tmp = Maps.newHashMap(); for(OptionValue v : options){ tmp.put(v.name, v); } - this.options = ImmutableMap.copyOf(tmp); - this.systemOptions = systemOptions; - } - - @Override - public Iterator<OptionValue> iterator() { - return Iterables.concat(systemOptions, options.values()).iterator(); + return ImmutableMap.copyOf(tmp); } @Override - public OptionValue getOption(String name) { - OptionValue value = options.get(name); - if (value == null && systemOptions != null) { - value = systemOptions.getOption(name); - } - return value; - } - - @Override - public void setOption(OptionValue value) throws SetOptionException { + boolean supportsOption(OptionValue value) { throw new UnsupportedOperationException(); } - @Override - public void setOption(String name, SqlLiteral literal, OptionValue.OptionType type) throws SetOptionException { - throw new UnsupportedOperationException(); - } - - @Override - public OptionAdmin getAdmin() { - throw new UnsupportedOperationException(); - } - - @Override - public OptionManager getSystemManager() { - throw new UnsupportedOperationException(); - } - @Override - public OptionList getOptionList() { - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java new file mode 100644 index 0000000..59411ce --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server.options; + +import java.util.Map; + +public abstract class InMemoryOptionManager extends FallbackOptionManager { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryOptionManager.class); + + final Map<String, OptionValue> options; + + InMemoryOptionManager(OptionManager fallback, Map<String, OptionValue> options) { + super(fallback); + this.options = options; + } + + @Override + OptionValue getLocalOption(String name) { + return options.get(name); + } + + @Override + boolean setLocalOption(OptionValue value) { + if(supportsOption(value)){ + options.put(value.name, value); + return true; + }else{ + return false; + } + + } + + abstract boolean supportsOption(OptionValue value); + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java index 82fc5ba..d04b654 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/QueryOptionManager.java @@ -17,81 +17,21 @@ */ package org.apache.drill.exec.server.options; -import java.util.Iterator; -import java.util.Map; +import java.util.HashMap; -import org.eigenbase.sql.SqlLiteral; +import org.apache.drill.exec.server.options.OptionValue.OptionType; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -public class QueryOptionManager implements OptionManager { +public class QueryOptionManager extends InMemoryOptionManager { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class); - private Map<String, OptionValue> options = Maps.newConcurrentMap(); - private OptionManager sessionOptions; - public QueryOptionManager(OptionManager sessionOptions) { - super(); - this.sessionOptions = sessionOptions; - } - - @Override - public Iterator<OptionValue> iterator() { - return Iterables.concat(sessionOptions, options.values()).iterator(); - } - - @Override - public OptionValue getOption(String name) { - OptionValue opt = options.get(name); - if (opt == null && sessionOptions != null) { - return sessionOptions.getOption(name); - } else { - return opt; - } - } - - @Override - public void setOption(OptionValue value) { - sessionOptions.getAdmin().validate(value); - setValidatedOption(value); + super(sessionOptions, new HashMap<String, OptionValue>()); } @Override - public void setOption(String name, SqlLiteral literal, OptionValue.OptionType type) { - OptionValue val = sessionOptions.getAdmin().validate(name, literal); - val.type = type; - setValidatedOption(val); + boolean supportsOption(OptionValue value) { + return value.type == OptionType.QUERY; } - private void setValidatedOption(OptionValue value) { - if (value.type == OptionValue.OptionType.QUERY) { - options.put(value.name, value); - } else { - sessionOptions.setOption(value); - } - } - - @Override - public OptionManager.OptionAdmin getAdmin() { - return sessionOptions.getAdmin(); - } - - @Override - public OptionManager getSystemManager() { - return sessionOptions.getSystemManager(); - } - - @Override - public OptionList getOptionList() { - OptionList list = new OptionList(); - list.addAll(sessionOptions.getOptionList()); - list.addAll(options.values()); - return list; - } - - public OptionList getSessionOptionList() { - return sessionOptions.getOptionList(); - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java index 4268d02..45c0ce8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java @@ -17,79 +17,18 @@ */ package org.apache.drill.exec.server.options; -import java.util.Iterator; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import org.apache.drill.exec.server.options.OptionValue.OptionType; -import org.eigenbase.sql.SqlLiteral; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -public class SessionOptionManager implements OptionManager{ +public class SessionOptionManager extends InMemoryOptionManager{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class); - private Map<String, OptionValue> options = Maps.newConcurrentMap(); - private OptionManager systemOptions; - public SessionOptionManager(OptionManager systemOptions) { - super(); - this.systemOptions = systemOptions; - } - - @Override - public Iterator<OptionValue> iterator() { - return Iterables.concat(systemOptions, options.values()).iterator(); - } - - @Override - public OptionValue getOption(String name) { - OptionValue opt = options.get(name); - if(opt == null && systemOptions != null){ - return systemOptions.getOption(name); - }else{ - return opt; - } - } - - @Override - public void setOption(OptionValue value) { - systemOptions.getAdmin().validate(value); - setValidatedOption(value); - } - - @Override - public void setOption(String name, SqlLiteral literal, OptionType type) { - OptionValue val = systemOptions.getAdmin().validate(name, literal); - val.type = type; - setValidatedOption(val); - } - - private void setValidatedOption(OptionValue value) { - if (value.type == OptionType.SESSION) { - options.put(value.name, value); - } else { - systemOptions.setOption(value); - } - } - - @Override - public OptionAdmin getAdmin() { - return systemOptions.getAdmin(); - } - - @Override - public OptionManager getSystemManager() { - return systemOptions; + super(systemOptions, new ConcurrentHashMap<String, OptionValue>()); } @Override - public OptionList getOptionList() { - OptionList list = new OptionList(); - for (OptionValue o : options.values()) { - list.add(o); - } - return list; + boolean supportsOption(OptionValue value) { + return value.type == OptionValue.OptionType.SESSION; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 9f912e0..1622d7b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.server.options; import java.io.IOException; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import org.apache.commons.collections.IteratorUtils; @@ -99,57 +100,61 @@ public class SystemOptionManager implements OptionManager { } public SystemOptionManager init() throws IOException{ - this.options = provider.getPStore(config); + this.options = provider.getStore(config); this.admin = new SystemOptionAdmin(); return this; } - private class Iter implements Iterator<OptionValue>{ - private Iterator<Map.Entry<String, OptionValue>> inner; - - public Iter(Iterator<Map.Entry<String, OptionValue>> inner) { - this.inner = inner; - } - - @Override - public boolean hasNext() { - return inner.hasNext(); - } - - @Override - public OptionValue next() { - return inner.next().getValue(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - } @Override public Iterator<OptionValue> iterator() { - return new Iter(options.iterator()); + Map<String, OptionValue> buildList = Maps.newHashMap(); + for(OptionValidator v : knownOptions.values()){ + buildList.put(v.getOptionName(), v.getDefault()); + } + for(Map.Entry<String, OptionValue> v : options){ + OptionValue value = v.getValue(); + buildList.put(value.name, value); + } + return buildList.values().iterator(); } @Override public OptionValue getOption(String name) { - return options.get(name); + // check local space + OptionValue v = options.get(name); + if(v != null){ + return v; + } + + // otherwise, return default. + OptionValidator validator = knownOptions.get(name); + if(validator == null){ + return null; + }else{ + return validator.getDefault(); + } } @Override public void setOption(OptionValue value) { assert value.type == OptionType.SYSTEM; admin.validate(value); - options.put(value.name, value); + setOptionInternal(value); + } + + private void setOptionInternal(OptionValue value){ + if(!value.equals(knownOptions.get(value.name))){ + options.put(value.name, value); + } } + @Override public void setOption(String name, SqlLiteral literal, OptionType type) { assert type == OptionValue.OptionType.SYSTEM; OptionValue v = admin.validate(name, literal); v.type = type; - options.put(name, v); + setOptionInternal(v); } @Override @@ -172,10 +177,24 @@ public class SystemOptionManager implements OptionManager { public SystemOptionAdmin() { for(OptionValidator v : VALIDATORS) { knownOptions.put(v.getOptionName(), v); - options.putIfAbsent(v.getOptionName(), v.getDefault()); } - } + for(Entry<String, OptionValue> v : options){ + OptionValue value = v.getValue(); + OptionValidator defaultValidator = knownOptions.get(v.getKey()); + if(defaultValidator == null){ + // deprecated option, delete. + options.delete(value.name); + logger.warn("Deleting deprecated option `{}`.", value.name); + }else if(value.equals(defaultValidator)){ + // option set with default value, remove storage of record. + options.delete(value.name); + logger.warn("Deleting option `{}` set to default value.", value.name); + } + + } + + } @Override public void registerOptionType(OptionValidator validator) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index d6c8ee0..5d0eed6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -87,7 +87,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage this.context = context; this.pluginSystemTable = context // .getPersistentStoreProvider() // - .getPStore(PStoreConfig // + .getStore(PStoreConfig // .newJacksonBuilder(context.getConfig().getMapper(), StoragePluginConfig.class) // .name("sys.storage_plugins") // .build()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index acd8fcb..7b9d52c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -88,8 +88,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa if (storageEngineName == null) { this.knownViews = null; } else { - this.knownViews = provider.getPStore(PStoreConfig // + this.knownViews = provider.getStore(PStoreConfig // .newJacksonBuilder(drillConfig.getMapper(), String.class) // + .persist() // .name(Joiner.on('.').join("storage.views", storageEngineName, schemaName)) // .build()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java index 7214092..2d04957 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java @@ -18,15 +18,11 @@ package org.apache.drill.exec.store.sys; -import java.util.Map; /** * Interfaces to define EStore, which is keep track of status/information for running queries. The information * would be gone, if the query is completed, or the foreman drillbit is not responding. * @param <V> */ -public interface EStore <V> extends Iterable<Map.Entry<String, V>> { - public V get(String key); - public void put(String key, V value); - public void delete(String key); +public interface EStore <V> extends PStore<V> { } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java index 32bf0b1..b09c5b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java @@ -25,5 +25,5 @@ import java.io.IOException; */ public interface EStoreProvider { - public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException; + public <V> PStore<V> getStore(PStoreConfig<V> table) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java index 040a99d..26c00ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java @@ -19,11 +19,13 @@ package org.apache.drill.exec.store.sys; import java.util.Map; +/** + * Interface for reading and writing values to a persistent storage provider. Iterators are guaranteed to be returned in key order. + * @param <V> + */ public interface PStore<V> extends Iterable<Map.Entry<String, V>> { public V get(String key); - public V getBlob(String key); public void put(String key, V value); - public void putBlob(String key, V value); public boolean putIfAbsent(String key, V value); public void delete(String key); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java index 7d5243f..83c2243 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java @@ -31,11 +31,25 @@ public class PStoreConfig<V> { private final String name; private final PClassSerializer<V> valueSerializer; + private final Mode mode; + private final int maxIteratorSize; - private PStoreConfig(String name, PClassSerializer<V> valueSerializer) { + public static enum Mode {PERSISTENT, EPHEMERAL, BLOB_PERSISTENT}; + + private PStoreConfig(String name, PClassSerializer<V> valueSerializer, Mode mode, int maxIteratorSize) { super(); this.name = name; this.valueSerializer = valueSerializer; + this.mode = mode; + this.maxIteratorSize = Math.abs(maxIteratorSize); + } + + public Mode getMode() { + return mode; + } + + public int getMaxIteratorSize() { + return maxIteratorSize; } public String getName() { @@ -57,20 +71,47 @@ public class PStoreConfig<V> { public static class PStoreConfigBuilder<V> { String name; PClassSerializer<V> serializer; + Mode mode = Mode.PERSISTENT; + int maxIteratorSize = Integer.MAX_VALUE; PStoreConfigBuilder(PClassSerializer<V> serializer) { super(); this.serializer = serializer; } - public <X extends Builder> PStoreConfigBuilder<V> name(String name) { + public PStoreConfigBuilder<V> name(String name) { this.name = name; return this; } + public PStoreConfigBuilder<V> persist(){ + this.mode = Mode.PERSISTENT; + return this; + } + + public PStoreConfigBuilder<V> ephemeral(){ + this.mode = Mode.EPHEMERAL; + return this; + } + + public PStoreConfigBuilder<V> blob(){ + this.mode = Mode.BLOB_PERSISTENT; + return this; + } + + /** + * Set the maximum size of the iterator. Positive numbers start from the start of the list. Negative numbers start from the end of the list. + * @param size + * @return + */ + public PStoreConfigBuilder<V> max(int size){ + this.maxIteratorSize = size; + return this; + } + public PStoreConfig<V> build(){ Preconditions.checkNotNull(name); - return new PStoreConfig<V>(name, serializer); + return new PStoreConfig<V>(name, serializer, mode, maxIteratorSize); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java index f6154e2..6371dfa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java @@ -21,7 +21,7 @@ import java.io.Closeable; import java.io.IOException; public interface PStoreProvider extends AutoCloseable, Closeable{ - public <V> PStore<V> getPStore(PStoreConfig<V> table) throws IOException; + + public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException; public void start() throws IOException; - public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java new file mode 100644 index 0000000..9f6ec29 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.local; + +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; +import org.apache.drill.exec.store.dfs.shim.FileSystemCreator; +import org.apache.drill.exec.store.sys.PStore; +import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class FilePStore<V> implements PStore<V> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilePStore.class); + + + private final Path basePath; + private final PStoreConfig<V> config; + private final DrillFileSystem fs; + + public FilePStore(DrillFileSystem fs, Path base, PStoreConfig<V> config) { + super(); + this.basePath = new Path(base, config.getName()); + this.config = config; + this.fs = fs; + + try { + mk(basePath); + } catch (IOException e) { + throw new RuntimeException("Failure setting pstore configuration path."); + } + } + + private void mk(Path path) throws IOException{ + fs.getUnderlying().mkdirs(path); + } + + public static Path getLogDir(){ + String drillLogDir = System.getenv("DRILL_LOG_DIR"); + if (drillLogDir == null) { + drillLogDir = "/var/log/drill"; + } + return new Path("file://" + new File(drillLogDir).getAbsoluteFile().toString()); + } + + public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{ + Path blobRoot = root == null ? getLogDir() : root; + Configuration fsConf = new Configuration(); + if(blobRoot.toUri().getScheme() != null){ + fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString()); + } + + + DrillFileSystem fs = FileSystemCreator.getFileSystem(config, fsConf); + fs.getUnderlying().mkdirs(blobRoot); + return fs; + } + + @Override + public Iterator<Entry<String, V>> iterator() { + try{ + List<FileStatus> f = fs.list(false, basePath); + if (f == null || f.isEmpty()) { + return Collections.emptyIterator(); + } + List<String> files = Lists.newArrayList(); + + for (FileStatus stat : f) { + String s = stat.getPath().getName(); + if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { + files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); + } + } + + Collections.sort(files); + files = files.subList(0, Math.min(files.size(), config.getMaxIteratorSize())); + return new Iter(files.iterator()); + + }catch(IOException e){ + throw new RuntimeException(e); + } + } + + private Path p(String name) throws IOException { + Preconditions.checkArgument( + !name.contains("/") && + !name.contains(":") && + !name.contains("..")); + + Path f = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + // do this to check file name. + return f; + } + + public V get(String key) { + try{ + Path path = p(key); + if(!fs.getUnderlying().exists(path)){ + return null; + } + }catch(IOException e){ + throw new RuntimeException(e); + } + + try (InputStream is = fs.open(p(key)).getInputStream()) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void put(String key, V value) { + try (OutputStream os = fs.create(p(key)).getOuputStream()) { + IOUtils.write(config.getSerializer().serialize(value), os); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean putIfAbsent(String key, V value) { + try { + Path p = p(key); + if (fs.getUnderlying().exists(p)) { + return false; + } else { + put(key, value); + return true; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void delete(String key) { + try { + fs.getUnderlying().delete(p(key), false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private class Iter implements Iterator<Entry<String, V>>{ + + private Iterator<String> keys; + private String current; + + public Iter(Iterator<String> keys) { + super(); + this.keys = keys; + } + + @Override + public boolean hasNext() { + return keys.hasNext(); + } + + @Override + public Entry<String, V> next() { + current = keys.next(); + return new DeferredEntry(current); + } + + @Override + public void remove() { + delete(current); + keys.remove(); + } + + private class DeferredEntry implements Entry<String, V> { + + private String name; + + + public DeferredEntry(String name) { + super(); + this.name = name; + } + + @Override + public String getKey() { + return name; + } + + @Override + public V getValue() { + return get(name); + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java index b505fd4..094d093 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java @@ -18,19 +18,24 @@ package org.apache.drill.exec.store.sys.local; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.EStoreProvider; import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.drill.exec.store.sys.PStoreConfig.Mode; -import java.io.IOException; -import java.util.concurrent.ConcurrentMap; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; public class LocalEStoreProvider implements EStoreProvider{ private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = Maps.newConcurrentMap(); @Override - public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException { + public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException { + Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral."); + if (! (estores.containsKey(storeConfig)) ) { EStore<V> p = new MapEStore<V>(); EStore<?> p2 = estores.putIfAbsent(storeConfig, p); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java deleted file mode 100644 index c10f862..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.local; - -import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.commons.io.IOUtils; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class LocalPStore<V> implements PStore<V> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStore.class); - - private static final String BLOB_QUALIFIER = "blob"; - - private final File basePath; - private final File blobPath; - private final PStoreConfig<V> config; - public LocalPStore(File base, PStoreConfig<V> config) { - super(); - this.basePath = new File(base, config.getName()); - this.blobPath = new File(basePath, BLOB_QUALIFIER); - if (!blobPath.exists()) { - blobPath.mkdirs(); - } - this.config = config; - } - - @Override - public Iterator<Entry<String, V>> iterator() { - String[] f = basePath.list(); - if (f == null) { - return Collections.emptyIterator(); - } - List<String> files = Lists.newArrayList(); - for (String s : f) { - if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); - } - } - - return new Iter(files.iterator()); - } - - private File p(String name, boolean blob) throws IOException { - Preconditions.checkArgument( - !name.contains("/") && - !name.contains(":") && - !name.contains("..")); - - File f = new File(blob ? blobPath : basePath, name + DRILL_SYS_FILE_SUFFIX); - // do this to check file name. - f.getCanonicalPath(); - return f; - } - - @Override - public V get(String key) { - return get(key, false); - } - - @Override - public V getBlob(String key) { - return get(key, true); - } - - protected V get(String key, boolean blob) { - try (InputStream is = new FileInputStream(p(key, blob))) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void put(String key, V value) { - put(key, false, value); - } - - @Override - public void putBlob(String key, V value) { - put(key, true, value); - } - - protected void put(String key, boolean blob, V value) { - try (OutputStream os = new FileOutputStream(p(key, blob))) { - IOUtils.write(config.getSerializer().serialize(value), os); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean putIfAbsent(String key, V value) { - try { - File f = p(key, false); - if (f.exists()) { - return false; - } else { - put(key, value); - return true; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void delete(String key) { - try { - delete(key, false); - delete(key, true); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - protected void delete(String key, boolean blob) throws IOException { - try { - p(key, blob).delete(); - } catch (FileNotFoundException e) { /* ignored */ } - } - - private class Iter implements Iterator<Entry<String, V>>{ - - private Iterator<String> keys; - private String current; - - public Iter(Iterator<String> keys) { - super(); - this.keys = keys; - } - - @Override - public boolean hasNext() { - return keys.hasNext(); - } - - @Override - public Entry<String, V> next() { - current = keys.next(); - return new DeferredEntry(current); - } - - @Override - public void remove() { - delete(current); - keys.remove(); - } - - private class DeferredEntry implements Entry<String, V> { - - private String name; - - public DeferredEntry(String name) { - super(); - this.name = name; - } - - @Override - public String getKey() { - return name; - } - - @Override - public V getValue() { - return get(name); - } - - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); - } - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java index c413866..ac53a61 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java @@ -17,19 +17,19 @@ */ package org.apache.drill.exec.store.sys.local; -import java.io.File; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreRegistry; import org.apache.drill.exec.store.sys.PStoreProvider; - -import com.google.common.collect.Maps; +import org.apache.drill.exec.store.sys.PStoreRegistry; +import org.apache.hadoop.fs.Path; /** * A really simple provider that stores data in the local file system, one value per file. @@ -37,21 +37,21 @@ import com.google.common.collect.Maps; public class LocalPStoreProvider implements PStoreProvider { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStoreProvider.class); - private File path; + private final Path path; private final boolean enableWrite; - private ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores; + private final ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores; private final LocalEStoreProvider estoreProvider; + private final DrillFileSystem fs; - public LocalPStoreProvider(DrillConfig config) { - path = new File(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH)); - enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE); - if (!enableWrite) { - pstores = Maps.newConcurrentMap(); - } - estoreProvider = new LocalEStoreProvider(); + public LocalPStoreProvider(DrillConfig config) throws IOException { + this.path = new Path(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH)); + this.enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE); + this.pstores = enableWrite ? null : new ConcurrentHashMap<PStoreConfig<?>, PStore<?>>(); + this.estoreProvider = new LocalEStoreProvider(); + this.fs = FilePStore.getFileSystem(config, path); } - public LocalPStoreProvider(PStoreRegistry registry) { + public LocalPStoreProvider(PStoreRegistry registry) throws IOException { this(registry.getConfig()); } @@ -60,14 +60,22 @@ public class LocalPStoreProvider implements PStoreProvider { } @Override - public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws IOException { - return estoreProvider.getEStore(storeConfig); + public <V> PStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException { + switch(storeConfig.getMode()){ + case EPHEMERAL: + return estoreProvider.getStore(storeConfig); + case BLOB_PERSISTENT: + case PERSISTENT: + return getPStore(storeConfig); + default: + throw new IllegalStateException(); + } + } - @Override - public <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException { + private <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException { if (enableWrite) { - return new LocalPStore<V>(path, storeConfig); + return new FilePStore<V>(fs, path, storeConfig); } else { PStore<V> p = new NoWriteLocalPStore<V>(); PStore<?> p2 = pstores.putIfAbsent(storeConfig, p); @@ -78,6 +86,7 @@ public class LocalPStoreProvider implements PStoreProvider { } } + @Override public void start() { } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java index 84e5027..2723916 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java @@ -51,4 +51,10 @@ public class MapEStore <V> implements EStore<V> { public Iterator<Map.Entry<String, V>> iterator() { return store.entrySet().iterator(); } + + @Override + public boolean putIfAbsent(String key, V value) { + V out = store.putIfAbsent(key, value); + return out == null; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java index d1ef931..71a41f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java @@ -47,21 +47,11 @@ public class NoWriteLocalPStore<V> implements PStore<V>{ } @Override - public V getBlob(String key) { - return blobMap.get(key); - } - - @Override public void put(String key, V value) { map.put(key, value); } @Override - public void putBlob(String key, V value) { - blobMap.put(key, value); - } - - @Override public boolean putIfAbsent(String key, V value) { return null == map.putIfAbsent(key, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java index b88ff74..d61f3b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java @@ -17,16 +17,25 @@ */ package org.apache.drill.exec.store.sys.zk; -import com.google.common.base.Preconditions; -import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.zookeeper.CreateMode; - import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; +import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.zookeeper.CreateMode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + /** * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store) * @param <V> @@ -35,6 +44,7 @@ public abstract class ZkAbstractStore<V> { protected CuratorFramework framework; protected PStoreConfig<V> config; + private final PathChildrenCache childrenCache; private String prefix; private String parent; @@ -50,16 +60,19 @@ public abstract class ZkAbstractStore<V> { if (framework.checkExists().forPath(parent) == null) { framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); } + + this.childrenCache = new PathChildrenCache(framework, parent, true); + this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE); + } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); + throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e); } } public Iterator<Entry<String, V>> iterator() { try { - List<String> children = framework.getChildren().forPath(parent); - return new Iter(children.iterator()); + return new Iter(childrenCache.getCurrentData()); } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); } @@ -73,10 +86,11 @@ public abstract class ZkAbstractStore<V> { public V get(String key) { try { - byte[] bytes = framework.getData().forPath(p(key)); - if (bytes == null) { + ChildData d = childrenCache.getCurrentData(p(key)); + if(d == null || d.getData() == null){ return null; } + byte[] bytes = d.getData(); return config.getSerializer().deserialize(bytes); } catch (Exception e) { @@ -86,11 +100,12 @@ public abstract class ZkAbstractStore<V> { public void put(String key, V value) { try { - if (framework.checkExists().forPath(p(key)) != null) { + if (childrenCache.getCurrentData(p(key)) != null) { framework.setData().forPath(p(key), config.getSerializer().serialize(value)); } else { createNodeInZK(key, value); } + childrenCache.rebuildNode(p(key)); } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); @@ -100,21 +115,43 @@ public abstract class ZkAbstractStore<V> { public void delete(String key) { try { framework.delete().forPath(p(key)); + childrenCache.rebuildNode(p(key)); } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e); } } + public boolean putIfAbsent(String key, V value) { + try { + if (childrenCache.getCurrentData(p(key)) != null) { + return false; + } else { + createNodeInZK(key, value); + childrenCache.rebuildNode(p(key)); + return true; + } + + } catch (Exception e) { + throw new RuntimeException("Failure while accessing Zookeeper", e); + } + } + public abstract void createNodeInZK (String key, V value); private class Iter implements Iterator<Entry<String, V>>{ - private Iterator<String> keys; - private String current; + private Iterator<ChildData> keys; + private ChildData current; - public Iter(Iterator<String> keys) { + public Iter(List<ChildData> children) { super(); - this.keys = keys; + List<ChildData> sortedChildren = Lists.newArrayList(children); + Collections.sort(sortedChildren, new Comparator<ChildData>(){ + @Override + public int compare(ChildData o1, ChildData o2) { + return o1.getPath().compareTo(o2.getPath()); + }}); + this.keys = sortedChildren.iterator(); } @Override @@ -130,27 +167,35 @@ public abstract class ZkAbstractStore<V> { @Override public void remove() { - delete(current); - keys.remove(); + delete(keyFromPath(current)); + } + + private String keyFromPath(ChildData data){ + String path = data.getPath(); + return path.substring(prefix.length(), path.length()); } private class DeferredEntry implements Entry<String, V>{ - private String name; + private ChildData data; - public DeferredEntry(String name) { + public DeferredEntry(ChildData data) { super(); - this.name = name; + this.data = data; } @Override public String getKey() { - return name; + return keyFromPath(data); } @Override public V getValue() { - return get(name); + try { + return config.getSerializer().deserialize(data.getData()); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java index 34b59a7..1c2c3fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java @@ -22,6 +22,9 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.EStoreProvider; import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.drill.exec.store.sys.PStoreConfig.Mode; + +import com.google.common.base.Preconditions; import java.io.IOException; @@ -33,7 +36,8 @@ public class ZkEStoreProvider implements EStoreProvider{ } @Override - public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException { + public <V> EStore<V> getStore(PStoreConfig<V> store) throws IOException { + Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL); return new ZkEStore<V>(curator,store); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java index 601ba8f..a597381 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java @@ -40,17 +40,9 @@ import com.google.common.base.Preconditions; */ public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{ - private DrillFileSystem fs; - private Path blobPath; - private boolean blobPathCreated; - - ZkPStore(CuratorFramework framework, DrillFileSystem fs, Path blobRoot, PStoreConfig<V> config) + ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { super(framework, config); - - this.fs = fs; - this.blobPath = new Path(blobRoot, config.getName()); - this.blobPathCreated = false; } @Override @@ -62,53 +54,7 @@ public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{ } } - @Override - public boolean putIfAbsent(String key, V value) { - try { - if (framework.checkExists().forPath(p(key)) != null) { - return false; - } else { - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); - return true; - } - - } catch (Exception e) { - throw new RuntimeException("Failure while accessing Zookeeper", e); - } - } - - @Override - public V getBlob(String key) { - try (DrillInputStream is = fs.open(path(key))) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is.getInputStream())); - } catch (FileNotFoundException e) { - return null; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void putBlob(String key, V value) { - try (DrillOutputStream os = fs.create(path(key))) { - IOUtils.write(config.getSerializer().serialize(value), os.getOuputStream()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - private Path path(String name) throws IOException { - Preconditions.checkArgument( - !name.contains("/") && - !name.contains(":") && - !name.contains("..")); - if (!blobPathCreated) { - fs.mkdirs(blobPath); - blobPathCreated = true; - } - - return new Path(blobPath, name + DRILL_SYS_FILE_SUFFIX); - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java index aa64f53..03d2441 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -21,18 +21,17 @@ import java.io.File; import java.io.IOException; import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; -import org.apache.drill.exec.store.dfs.shim.FileSystemCreator; import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreRegistry; import org.apache.drill.exec.store.sys.PStoreProvider; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.drill.exec.store.sys.PStoreRegistry; +import org.apache.drill.exec.store.sys.local.FilePStore; import org.apache.hadoop.fs.Path; import com.google.common.annotations.VisibleForTesting; @@ -45,9 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider { private final CuratorFramework curator; private final DrillFileSystem fs; - private final Path blobRoot; - private final ZkEStoreProvider zkEStoreProvider; public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException { @@ -59,49 +56,45 @@ public class ZkPStoreProvider implements PStoreProvider { if (registry.getConfig().hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) { blobRoot = new Path(registry.getConfig().getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)); - } else { - String drillLogDir = System.getenv("DRILL_LOG_DIR"); - if (drillLogDir == null) { - drillLogDir = "/var/log/drill"; - } - blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI()); + }else{ + blobRoot = FilePStore.getLogDir(); } - Configuration fsConf = new Configuration(); - fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString()); - try { - fs = FileSystemCreator.getFileSystem(registry.getConfig(), fsConf); - fs.mkdirs(blobRoot); - } catch (IOException e) { - throw new DrillbitStartupException("Unable to initialize blob storage.", e); + + try{ + this.fs = FilePStore.getFileSystem(registry.getConfig(), blobRoot); + }catch(IOException e){ + throw new DrillbitStartupException("Failure while attempting to set up blob store.", e); } - zkEStoreProvider = new ZkEStoreProvider(curator); + + this.zkEStoreProvider = new ZkEStoreProvider(curator); } @VisibleForTesting - public ZkPStoreProvider(CuratorFramework curator) { + public ZkPStoreProvider(DrillConfig config, CuratorFramework curator) throws IOException { this.curator = curator; - this.fs = null; - String drillLogDir = System.getenv("DRILL_LOG_DIR"); - if (drillLogDir == null) { - drillLogDir = "/var/log/drill"; - } - blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI()); - zkEStoreProvider = new ZkEStoreProvider(curator); + this.blobRoot = FilePStore.getLogDir(); + this.fs = FilePStore.getFileSystem(config, blobRoot); + this.zkEStoreProvider = new ZkEStoreProvider(curator); } @Override public void close() { } - @Override - public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException { - return zkEStoreProvider.getEStore(store); - } @Override - public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException { - return new ZkPStore<V>(curator, fs, blobRoot, store); + public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException { + switch(config.getMode()){ + case BLOB_PERSISTENT: + return new FilePStore<V>(fs, blobRoot, config); + case EPHEMERAL: + return zkEStoreProvider.getStore(config); + case PERSISTENT: + return new ZkPStore<V>(curator, config); + default: + throw new IllegalStateException(); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index fc0972f..3228da9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -83,6 +83,16 @@ public class ControlHandlerImpl implements ControlMessageHandler { // TODO: Support a type of message that has no response. return DataRpcConfig.OK; + case RpcType.REQ_QUERY_CANCEL_VALUE: + QueryId id = get(pBody, QueryId.PARSER); + Foreman f = bee.getForemanForQueryId(id); + if(f != null){ + f.cancel(); + return DataRpcConfig.OK; + }else{ + return DataRpcConfig.FAIL; + } + case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: InitializeFragments fragments = get(pBody, InitializeFragments.PARSER); for(int i =0; i < fragments.getFragmentCount(); i++){ @@ -95,13 +105,9 @@ public class ControlHandlerImpl implements ControlMessageHandler { Foreman foreman = bee.getForemanForQueryId(queryId); QueryProfile profile; if (foreman == null) { - try { - profile = bee.getContext().getPersistentStoreProvider().getEStore(QueryStatus.RUNNING_QUERY_PROFILE).get(QueryIdHelper.getQueryId(queryId)); - } catch (IOException e) { - throw new RpcException("Failed to get persistent store", e); - } + throw new RpcException("Query not running on node."); } else { - profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile(true); + profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile(); } return new Response(RpcType.RESP_QUERY_STATUS, profile); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2eb72a7c/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index e47c0be..7a0e501 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -436,7 +436,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ @Override public int compareTo(Object o) { - return o.hashCode() - o.hashCode(); + return hashCode() - o.hashCode(); } }