Several changes: - added base system test capacle of bootin a drillbit cluster - added a client sys test that submits a plan - corrected a bug in ZKClusterCoordinator where endpoints would not get fetched before a cacheChanged event - corrected a bug in HazelCache where it wouldn't support fetching an existing HazelCastInstance - added a mini zookeeper cluster based on HBase's one but without the hadoop dependency - made sure all tests pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/31fb6eb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/31fb6eb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/31fb6eb0 Branch: refs/heads/execwork Commit: 31fb6eb0124ac8801e27b32bf54ff7a34ce7d85a Parents: 4f3a1c6 Author: David Ribeiro Alves <[email protected]> Authored: Thu Apr 18 22:03:39 2013 -0500 Committer: David Ribeiro Alves <[email protected]> Committed: Thu Apr 18 22:03:39 2013 -0500 ---------------------------------------------------------------------- .../apache/drill/common/config/DrillConfig.java | 12 +- .../apache/drill/common/config/NestedConfig.java | 16 +- .../org/apache/drill/exec/cache/HazelCache.java | 11 +- .../org/apache/drill/exec/client/DrillClient.java | 100 ++++ .../drill/exec/coord/ClusterCoordinator.java | 14 +- .../drill/exec/coord/ZKClusterCoordinator.java | 137 +++--- .../org/apache/drill/exec/server/Drillbit.java | 20 +- .../apache/drill/exec/server/DrillbitContext.java | 3 +- .../java-exec/src/main/resources/drill-module.conf | 1 + .../org/apache/drill/exec/DrillSystemTestBase.java | 77 +++- .../drill/exec/client/DrillClientSystemTest.java | 42 ++- .../drill/exec/util/MiniZooKeeperCluster.java | 368 +++++++++++++++ .../java-exec/src/test/resources/drill-module.conf | 28 ++ .../java-exec/src/test/resources/simple_plan.json | 133 ++++++ 14 files changed, 861 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java index b738002..dc9327e 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.CopyOnWriteArrayList; +import com.google.common.annotations.VisibleForTesting; +import com.sun.media.jfxmedia.events.VideoTrackSizeListener; import org.apache.drill.common.exceptions.DrillConfigurationException; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.StorageEngineConfigBase; @@ -44,8 +46,9 @@ public final class DrillConfig extends NestedConfig{ @SuppressWarnings("unchecked") private volatile List<Queue<Object>> sinkQueues = new CopyOnWriteArrayList<Queue<Object>>(new Queue[1]); - - private DrillConfig(Config config) { + + @VisibleForTesting + public DrillConfig(Config config) { super(config); mapper = new ObjectMapper(); SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule").addDeserializer(LogicalExpression.class, new LogicalExpression.De(this)); @@ -145,4 +148,9 @@ public final class DrillConfig extends NestedConfig{ public ObjectMapper getMapper(){ return mapper; } + + @Override + public String toString(){ + return this.root().render(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java index b8d6133..6954707 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java @@ -17,18 +17,12 @@ ******************************************************************************/ package org.apache.drill.common.config; +import com.typesafe.config.*; + import java.util.List; import java.util.Map.Entry; import java.util.Set; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigList; -import com.typesafe.config.ConfigMergeable; -import com.typesafe.config.ConfigObject; -import com.typesafe.config.ConfigOrigin; -import com.typesafe.config.ConfigResolveOptions; -import com.typesafe.config.ConfigValue; - abstract class NestedConfig implements Config { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedConfig.class); @@ -187,15 +181,15 @@ abstract class NestedConfig implements Config { } public Config atPath(String path) { - return null; + return c.atPath(path); } public Config atKey(String key) { - return null; + return c.atKey(key); } public Config withValue(String path, ConfigValue value) { - return null; + return c.withValue(path, value); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java index cc73799..b7477a3 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java @@ -65,9 +65,8 @@ public class HazelCache implements DistributedCache { public void run(DrillbitEndpoint endpoint) { Config c = new Config(); - // todo, utilize cluster member ship to set up other nodes. c.setInstanceName(instanceName); - instance = Hazelcast.newHazelcastInstance(c); + instance = getInstanceOrCreateNew(c); workQueueLengths = instance.getTopic("queue-length"); optimizedPlans = instance.getMap("plan-optimizations"); this.endpoint = endpoint; @@ -75,6 +74,14 @@ public class HazelCache implements DistributedCache { workQueueLengths.addMessageListener(new Listener()); } + private HazelcastInstance getInstanceOrCreateNew(Config c) { + for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){ + if (instance.getName().equals(this.instanceName)) + return instance; + } + return Hazelcast.newHazelcastInstance(c); + } + @Override public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical) { optimizedPlans.put(logical, physical); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java new file mode 100644 index 0000000..e0f3347 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -0,0 +1,100 @@ +package org.apache.drill.exec.client; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.nio.NioEventLoopGroup; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.ZKClusterCoordinator; +import org.apache.drill.exec.rpc.DrillRpcFuture; +import org.apache.drill.exec.rpc.NamedThreadFactory; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.UserClient; + +import java.io.IOException; +import java.util.Collection; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Iterables.get; +import static io.netty.buffer.Unpooled.EMPTY_BUFFER; +import static io.netty.buffer.Unpooled.copiedBuffer; +import static java.nio.charset.Charset.forName; +import static org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import static org.apache.drill.exec.proto.UserProtos.QueryHandle; +import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL; +import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder; + +/** + * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf + */ +public class DrillClient { + + DrillConfig config; + private UserClient client; + private ClusterCoordinator clusterCoordinator; + + public DrillClient() { + this(DrillConfig.create()); + } + + public DrillClient(String fileName) { + this(DrillConfig.create(fileName)); + } + + public DrillClient(DrillConfig config) { + this.config = config; + } + + /** + * Connects the client to a Drillbit server + * + * @throws IOException + */ + public void connect() throws Exception { + this.clusterCoordinator = new ZKClusterCoordinator(this.config); + this.clusterCoordinator.start(); + Thread.sleep(10000); + Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints(); + checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found"); + // just use the first endpoint for now + DrillbitEndpoint endpoint = get(endpoints, 0); + ByteBufAllocator bb = new PooledByteBufAllocator(true); + this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-"))); + try { + this.client.connectAsClient(endpoint.getAddress(), endpoint.getUserPort()); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Closes this client's connection to the server + * + * @throws IOException + */ + public void close() throws IOException { + this.client.close(); + } + + /** + * Submits a Logical plan for direct execution (bypasses parsing) + * + * @param plan the plan to execute + * @return a handle for the query result + * @throws RpcException + */ + public DrillRpcFuture<QueryHandle> submitPlan(String plan) throws RpcException { + return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).setPlan(plan).build(), EMPTY_BUFFER); + } + + /** + * Submits a Query for parsing and execution + * + * @param query the query to execute + * @return a handle for the query result + * @throws RpcException + */ + public DrillRpcFuture<QueryHandle> submitQuery(String query) throws RpcException { + return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).build(), copiedBuffer(query, forName("UTF-8"))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java index 9c7eab2..d7ea8fa 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java @@ -17,11 +17,11 @@ ******************************************************************************/ package org.apache.drill.exec.coord; -import java.io.Closeable; -import java.util.List; - import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import java.io.Closeable; +import java.util.Collection; + /** * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities * as well as understand other node's existence and capabilities. @@ -36,10 +36,12 @@ public abstract class ClusterCoordinator implements Closeable { public abstract void unregister(RegistrationHandle handle); /** - * Get a list of avialable Drillbit endpoints. Thread-safe. Could be slightly out of date depending on refresh policy. - * @return A list of available endpoints. + * Get a collection of avialable Drillbit endpoints, Thread-safe. + * Could be slightly out of date depending on refresh policy. + * + * @return A collection of available endpoints. */ - public abstract List<DrillbitEndpoint> getAvailableEndpoints(); + public abstract Collection<DrillbitEndpoint> getAvailableEndpoints(); public interface RegistrationHandle { } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java index b3cd27f..3ad08e1 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,17 +17,7 @@ ******************************************************************************/ package org.apache.drill.exec.coord; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; - -import com.google.common.base.Throwables; +import com.google.common.base.Function; import com.netflix.curator.RetryPolicy; import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFrameworkFactory; @@ -38,8 +28,20 @@ import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder; import com.netflix.curator.x.discovery.ServiceInstance; import com.netflix.curator.x.discovery.details.ServiceCache; import com.netflix.curator.x.discovery.details.ServiceCacheListener; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; -/** Manages cluster coordination utilizing zookeeper. **/ +import static com.google.common.base.Throwables.propagate; +import static com.google.common.collect.Collections2.transform; + +/** + * Manages cluster coordination utilizing zookeeper. * + */ public class ZKClusterCoordinator extends ClusterCoordinator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKClusterCoordinator.class); @@ -47,24 +49,26 @@ public class ZKClusterCoordinator extends ClusterCoordinator { private CuratorFramework curator; private ServiceDiscovery<DrillbitEndpoint> discovery; private ServiceCache<DrillbitEndpoint> serviceCache; - private volatile List<DrillbitEndpoint> endpoints = Collections.emptyList(); + private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList(); private final String serviceName; + public ZKClusterCoordinator(DrillConfig config) throws IOException { - + this.basePath = config.getString(ExecConstants.ZK_ROOT); - this.serviceName = config.getString(ExecConstants.SERVICE_NAME); - + this.serviceName = config.getString(ExecConstants.SERVICE_NAME); RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES), - config.getInt(ExecConstants.ZK_RETRY_DELAY)); - + config.getInt(ExecConstants.ZK_RETRY_DELAY)); curator = CuratorFrameworkFactory.builder() - .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) - .retryPolicy(rp) - .connectString(config.getString(ExecConstants.ZK_CONNECTION)) - .build(); - + .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) + .retryPolicy(rp) + .connectString(config.getString(ExecConstants.ZK_CONNECTION)) + .build(); discovery = getDiscovery(); - serviceCache = discovery.serviceCacheBuilder().name(serviceName).refreshPaddingMs(config.getInt(ExecConstants.ZK_REFRESH)).build(); + serviceCache = discovery. + serviceCacheBuilder() + .name(serviceName) + .refreshPaddingMs(config.getInt(ExecConstants.ZK_REFRESH)) + .build(); } public void start() throws Exception { @@ -73,10 +77,11 @@ public class ZKClusterCoordinator extends ClusterCoordinator { discovery.start(); serviceCache.start(); serviceCache.addListener(new ZKListener()); + updateEndpoints(); } - - private class ZKListener implements ServiceCacheListener{ - + + private class ZKListener implements ServiceCacheListener { + @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } @@ -84,62 +89,78 @@ public class ZKClusterCoordinator extends ClusterCoordinator { @Override public void cacheChanged() { logger.debug("Cache changed, updating."); - try { - Collection<ServiceInstance<DrillbitEndpoint>> instances = discovery.queryForInstances(serviceName); - List<DrillbitEndpoint> newEndpoints = new ArrayList<DrillbitEndpoint>(instances.size()); - for(ServiceInstance<DrillbitEndpoint> si : instances){ - newEndpoints.add(si.getPayload()); - } - endpoints = newEndpoints; - } catch (Exception e) { - logger.error("Failure while update Drillbit service location cache.", e); - } + updateEndpoints(); } } - public void close() throws IOException{ + public void close() throws IOException { serviceCache.close(); discovery.close(); curator.close(); } - + @Override public RegistrationHandle register(DrillbitEndpoint data) { try { - ServiceInstance<DrillbitEndpoint> si = getSI(data); - discovery.registerService(si); - return new ZKRegistrationHandle(si.getId()); + ServiceInstance<DrillbitEndpoint> serviceInstance = getServiceInstance(data); + discovery.registerService(serviceInstance); + return new ZKRegistrationHandle(serviceInstance.getId()); } catch (Exception e) { - Throwables.propagate(e); - return null; + throw propagate(e); } } @Override public void unregister(RegistrationHandle handle) { - if( !( handle instanceof ZKRegistrationHandle)) throw new UnsupportedOperationException("Unknown handle type"); - + if (!(handle instanceof ZKRegistrationHandle)) throw new UnsupportedOperationException("Unknown handle type"); + ZKRegistrationHandle h = (ZKRegistrationHandle) handle; try { - ServiceInstance<DrillbitEndpoint> si = ServiceInstance.<DrillbitEndpoint>builder().address("").port(0).id(h.id).name(ExecConstants.SERVICE_NAME).build(); - discovery.unregisterService(si); + ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder() + .address("") + .port(0) + .id(h.id) + .name(serviceName) + .build(); + discovery.unregisterService(serviceInstance); } catch (Exception e) { - Throwables.propagate(e); + propagate(e); } } @Override - public List<DrillbitEndpoint> getAvailableEndpoints() { + public Collection<DrillbitEndpoint> getAvailableEndpoints() { return this.endpoints; } - - private ServiceInstance<DrillbitEndpoint> getSI(DrillbitEndpoint ep) throws Exception{ - return ServiceInstance.<DrillbitEndpoint>builder().name(ExecConstants.SERVICE_NAME).payload(ep).build(); + + private void updateEndpoints() { + try { + endpoints = transform(discovery.queryForInstances(serviceName), + new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() { + @Override + public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) { + return input.getPayload(); + } + }); + } catch (Exception e) { + logger.error("Failure while update Drillbit service location cache.", e); + } } - - + + private ServiceInstance<DrillbitEndpoint> getServiceInstance(DrillbitEndpoint endpoint) throws Exception { + return ServiceInstance.<DrillbitEndpoint>builder() + .name(serviceName) + .payload(endpoint) + .build(); + } + public ServiceDiscovery<DrillbitEndpoint> getDiscovery() { - return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath(basePath).client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build(); + return ServiceDiscoveryBuilder + .builder(DrillbitEndpoint.class) + .basePath(basePath) + .client(curator) + .serializer(DrillServiceInstanceHelper.SERIALIZER) + .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 99ebe85..2961fae 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -32,20 +32,24 @@ import org.apache.drill.exec.service.ServiceEngine; import java.net.InetAddress; +/** + * Starts, tracks and stops all the required services for a Drillbit daemon to work. + */ public class Drillbit { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class); public static Drillbit start(StartupOptions options) throws DrillbitStartupException { - Drillbit bit = null; + return start(DrillConfig.create(options.getConfigLocation())); + } + + public static Drillbit start(DrillConfig config) throws DrillbitStartupException { + Drillbit bit; try { logger.debug("Setting up Drillbit."); - DrillConfig config = DrillConfig.create(options.getConfigLocation()); bit = new Drillbit(config); } catch (Exception ex) { throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex); } - - try { logger.debug("Starting Drillbit."); bit.run(); @@ -80,9 +84,11 @@ public class Drillbit { public void run() throws Exception { coord.start(); engine.start(); - - DrillbitEndpoint md = DrillbitEndpoint.newBuilder().setAddress(InetAddress.getLocalHost().getHostAddress()) - .setBitPort(engine.getBitPort()).setUserPort(engine.getUserPort()).build(); + DrillbitEndpoint md = DrillbitEndpoint.newBuilder() + .setAddress(InetAddress.getLocalHost().getHostAddress()) + .setBitPort(engine.getBitPort()) + .setUserPort(engine.getUserPort()) + .build(); handle = coord.register(md); cache.run(md); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 94c8207..e3a24d2 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.server; import io.netty.channel.nio.NioEventLoopGroup; +import java.util.Collection; import java.util.List; import org.apache.drill.common.config.DrillConfig; @@ -45,7 +46,7 @@ public class DrillbitContext { return config; } - public List<DrillbitEndpoint> getBits(){ + public Collection<DrillbitEndpoint> getBits(){ return underlyingBit.coord.getAvailableEndpoints(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf index ad18d6e..c516dda 100644 --- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf +++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf @@ -4,6 +4,7 @@ drill.exec: { cluster-id: "drillbits1" rpc: { + user.address : localhost user.port : 31010, bit.port : 31011 }, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java index 7f5264c..645c4d5 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java @@ -18,27 +18,66 @@ package org.apache.drill.exec; import com.google.common.collect.ImmutableList; +import com.typesafe.config.ConfigValueFactory; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.server.Drillbit; -import org.apache.drill.exec.server.StartupOptions; +import org.apache.drill.exec.util.MiniZooKeeperCluster; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import java.io.File; +import java.io.IOException; import java.util.List; import static com.google.common.base.Throwables.propagate; /** * Base class for Drill system tests. - * Starts one or more Drillbits and provides a configured client for testing. + * Starts one or more Drillbits, an embedded ZooKeeper cluster and provides a configured client for testing. */ public class DrillSystemTestBase { - private static List<Drillbit> servers; + static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class); - public void startCluster(StartupOptions options, int numServers) { + private static File testDir = new File("target/test-data"); + private static DrillConfig config; + private static String zkUrl; + private static int bitPort; + private static int userPort; + + private List<Drillbit> servers; + private MiniZooKeeperCluster zkCluster; + + @BeforeClass + public static void setUp() throws Exception { + config = DrillConfig.create(); + bitPort = config.getInt(ExecConstants.INITIAL_BIT_PORT); + userPort = config.getInt(ExecConstants.INITIAL_USER_PORT); + zkUrl = config.getString(ExecConstants.ZK_CONNECTION); + setupTestDir(); + } + + private static void setupTestDir() { + if (!testDir.exists()) { + testDir.mkdirs(); + } + } + + private DrillConfig newConfigWithDifferentPorts() { + return new DrillConfig(config + .withValue(ExecConstants.INITIAL_BIT_PORT, ConfigValueFactory.fromAnyRef(bitPort++)) + .withValue(ExecConstants.INITIAL_USER_PORT, ConfigValueFactory.fromAnyRef(userPort++))); + } + + public void startCluster(int numServers) { try { ImmutableList.Builder<Drillbit> servers = ImmutableList.builder(); for (int i = 0; i < numServers; i++) { - servers.add(Drillbit.start(options)); + DrillConfig config = newConfigWithDifferentPorts(); +// System.out.println("NEW CONFIG"); +// System.out.println(config); + servers.add(Drillbit.start(config)); } this.servers = servers.build(); } catch (DrillbitStartupException e) { @@ -46,16 +85,36 @@ public class DrillSystemTestBase { } } - public void startZookeeper() { - + public void startZookeeper(int numServers) { + try { + this.zkCluster = new MiniZooKeeperCluster(); + zkCluster.setDefaultClientPort(Integer.parseInt(this.zkUrl.split(":")[1])); + zkCluster.startup(testDir, numServers); + } catch (IOException e) { + propagate(e); + } catch (InterruptedException e) { + propagate(e); + } } public void stopCluster() { - + if (servers != null) { + for (Drillbit server : servers) { + try { + server.close(); + } catch (Exception e) { + logger.warn("Error shutting down Drillbit", e); + } + } + } } public void stopZookeeper() { - + try { + this.zkCluster.shutdown(); + } catch (IOException e) { + propagate(e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java index a8ac41e..09a06d7 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java @@ -1,7 +1,12 @@ package org.apache.drill.exec.client; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; import org.apache.drill.exec.DrillSystemTestBase; -import org.apache.drill.exec.server.StartupOptions; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.rpc.DrillRpcFuture; +import org.junit.After; +import org.junit.BeforeClass; import org.junit.Test; /** @@ -9,12 +14,39 @@ import org.junit.Test; */ public class DrillClientSystemTest extends DrillSystemTestBase { - StartupOptions options = new StartupOptions(); + private static String plan; - @Test - public void testSubmitQuery() { - startCluster(options, 1); + @BeforeClass + public static void setUp() throws Exception { + DrillSystemTestBase.setUp(); + plan = Resources.toString(Resources.getResource("simple_plan.json"), Charsets.UTF_8); + } + @After + public void tearDown() { + stopCluster(); + stopZookeeper(); + } + @Test + public void testSubmitPlanSingleNode() throws Exception { + startZookeeper(1); + startCluster(1); + DrillClient client = new DrillClient(); + client.connect(); + DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan); + System.out.println(result.get()); + client.close(); + } + + @Test + public void testSubmitPlanTwoNodes() throws Exception { + startZookeeper(1); + startCluster(2); + DrillClient client = new DrillClient(); + client.connect(); + DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan); + System.out.println(result.get()); + client.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java new file mode 100644 index 0000000..05011ac --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java @@ -0,0 +1,368 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnLog; + +import java.io.*; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Mostly Copied from HBase's MiniZooKeeperCluster, but without the Hadoop dependency. + */ +public class MiniZooKeeperCluster { + private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class); + + private static final int TICK_TIME = 2000; + private static final int CONNECTION_TIMEOUT = 30000; + + private boolean started; + + /** + * The default port. If zero, we use a random port. + */ + private int defaultClientPort = 0; + + private int clientPort; + + private List<NIOServerCnxnFactory> standaloneServerFactoryList; + private List<ZooKeeperServer> zooKeeperServers; + private List<Integer> clientPortList; + + private int activeZKServerIndex; + private int tickTime = 0; + + public MiniZooKeeperCluster() { + this.started = false; +// this.configuration = configuration; + activeZKServerIndex = -1; + zooKeeperServers = new ArrayList<ZooKeeperServer>(); + clientPortList = new ArrayList<Integer>(); + standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>(); + } + + public void setDefaultClientPort(int clientPort) { + if (clientPort <= 0) { + throw new IllegalArgumentException("Invalid default ZK client port: " + + clientPort); + } + this.defaultClientPort = clientPort; + } + + /** + * Selects a ZK client port. Returns the default port if specified. + * Otherwise, returns a random port. The random port is selected from the + * range between 49152 to 65535. These ports cannot be registered with IANA + * and are intended for dynamic allocation (see http://bit.ly/dynports). + */ + private int selectClientPort() { + if (defaultClientPort > 0) { + return defaultClientPort; + } + return 0xc000 + new Random().nextInt(0x3f00); + } + + public void setTickTime(int tickTime) { + this.tickTime = tickTime; + } + + public int getBackupZooKeeperServerNum() { + return zooKeeperServers.size() - 1; + } + + public int getZooKeeperServerNum() { + return zooKeeperServers.size(); + } + + // / XXX: From o.a.zk.t.ClientBase + private static void setupTestEnv() { + // during the tests we run with 100K prealloc in the logs. + // on windows systems prealloc of 64M was seen to take ~15seconds + // resulting in test failure (client timeout on first session). + // set env and directly in order to handle static init/gc issues + System.setProperty("zookeeper.preAllocSize", "100"); + FileTxnLog.setPreallocSize(100 * 1024); + } + + public int startup(File baseDir) throws IOException, InterruptedException { + return startup(baseDir, 1); + } + + /** + * @param baseDir + * @param numZooKeeperServers + * @return ClientPort server bound to. + * @throws IOException + * @throws InterruptedException + */ + public int startup(File baseDir, int numZooKeeperServers) throws IOException, + InterruptedException { + if (numZooKeeperServers <= 0) + return -1; + + setupTestEnv(); + shutdown(); + + int tentativePort = selectClientPort(); + + // running all the ZK servers + for (int i = 0; i < numZooKeeperServers; i++) { + File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile(); + recreateDir(dir); + int tickTimeToUse; + if (this.tickTime > 0) { + tickTimeToUse = this.tickTime; + } else { + tickTimeToUse = TICK_TIME; + } + ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); + NIOServerCnxnFactory standaloneServerFactory; + while (true) { + try { + standaloneServerFactory = new NIOServerCnxnFactory(); + standaloneServerFactory.configure( + new InetSocketAddress(tentativePort), 1000); + } catch (BindException e) { + LOG.debug("Failed binding ZK Server to client port: " + + tentativePort); + // This port is already in use, try to use another. + tentativePort++; + continue; + } + break; + } + + // Start up this ZK server + standaloneServerFactory.startup(server); + if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for startup of standalone server"); + } + + // We have selected this port as a client port. + clientPortList.add(tentativePort); + standaloneServerFactoryList.add(standaloneServerFactory); + zooKeeperServers.add(server); + } + + // set the first one to be active ZK; Others are backups + activeZKServerIndex = 0; + started = true; + clientPort = clientPortList.get(activeZKServerIndex); + LOG.info("Started MiniZK Cluster and connect 1 ZK server " + + "on client port: " + clientPort); + return clientPort; + } + + private void recreateDir(File dir) throws IOException { + if (dir.exists()) { + FileUtil.fullyDelete(dir); + } + try { + dir.mkdirs(); + } catch (SecurityException e) { + throw new IOException("creating dir: " + dir, e); + } + } + + /** + * @throws IOException + */ + public void shutdown() throws IOException { + if (!started) { + return; + } + // shut down all the zk servers + for (int i = 0; i < standaloneServerFactoryList.size(); i++) { + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(i); + int clientPort = clientPortList.get(i); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + } + + // clear everything + started = false; + activeZKServerIndex = 0; + standaloneServerFactoryList.clear(); + clientPortList.clear(); + zooKeeperServers.clear(); + + LOG.info("Shutdown MiniZK cluster with all ZK servers"); + } + + /** + * @return clientPort return clientPort if there is another ZK backup can run + * when killing the current active; return -1, if there is no backups. + * @throws IOException + * @throws InterruptedException + */ + public int killCurrentActiveZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0) { + return -1; + } + + // Shutdown the current active one + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(activeZKServerIndex); + int clientPort = clientPortList.get(activeZKServerIndex); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + // remove the current active zk server + standaloneServerFactoryList.remove(activeZKServerIndex); + clientPortList.remove(activeZKServerIndex); + zooKeeperServers.remove(activeZKServerIndex); + LOG.info("Kill the current active ZK servers in the cluster " + + "on client port: " + clientPort); + + if (standaloneServerFactoryList.size() == 0) { + // there is no backup servers; + return -1; + } + clientPort = clientPortList.get(activeZKServerIndex); + LOG.info("Activate a backup zk server in the cluster " + + "on client port: " + clientPort); + // return the next back zk server's port + return clientPort; + } + + /** + * Kill one back up ZK servers + * + * @throws IOException + * @throws InterruptedException + */ + public void killOneBackupZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0 || + standaloneServerFactoryList.size() <= 1) { + return; + } + + int backupZKServerIndex = activeZKServerIndex + 1; + // Shutdown the current active one + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(backupZKServerIndex); + int clientPort = clientPortList.get(backupZKServerIndex); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + // remove this backup zk server + standaloneServerFactoryList.remove(backupZKServerIndex); + clientPortList.remove(backupZKServerIndex); + zooKeeperServers.remove(backupZKServerIndex); + LOG.info("Kill one backup ZK servers in the cluster " + + "on client port: " + clientPort); + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerDown(int port, long timeout) { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket("localhost", port); + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + } finally { + sock.close(); + } + } catch (IOException e) { + return true; + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + } + return false; + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerUp(int port, long timeout) { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket("localhost", port); + BufferedReader reader = null; + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + + Reader isr = new InputStreamReader(sock.getInputStream()); + reader = new BufferedReader(isr); + String line = reader.readLine(); + if (line != null && line.startsWith("Zookeeper version:")) { + return true; + } + } finally { + sock.close(); + if (reader != null) { + reader.close(); + } + } + } catch (IOException e) { + // ignore as this is expected + LOG.info("server localhost:" + port + " not up " + e); + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + } + return false; + } + + public int getClientPort() { + return clientPort; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf new file mode 100644 index 0000000..7c97f66 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf @@ -0,0 +1,28 @@ +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. +// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. +drill.exec: { + cluster-id: "drillbits1" + rpc: { + user.port : 31010, + bit.port : 32010 + }, + optimizer: { + implementation: "org.apache.drill.exec.opt.IdentityOptimizer" + }, + + zk: { + connect: "localhost:2181", + root: "/drill", + refresh: 500, + timeout: 1000, + retry: { + count: 7200, + delay: 500 + } + } + + network: { + start: 35000 + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json b/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json new file mode 100644 index 0000000..2457b1f --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json @@ -0,0 +1,133 @@ +{ + head:{ + type:"apache_drill_logical_plan", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + storage:{ + logs: { + type:"text", + file: "local://logs/*.log", + compress:"gzip", + line-delimiter:"\n", + record-maker:{ + type:"first-row", + delimiter:"," + } + }, + { + type:"mongo", + name:"users", + connection:"mongodb://blue:red@localhost/users" + }, + { + type:"mysql", + name:"mysql", + connection:"jdbc:mysql://localhost/main" + } + ], + query:[ + { + @id:"1", + op:"scan", + memo:"initial_scan", + storageengine:"local-logs", + selection: {} + }, + { + @id:"2", + input:"1", + memo:"transform1", + op:"transform", + transforms:[ + { + ref:"userId", + expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")" + }, + { + ref:"session", + expr:"regex_like('activity.cookie', \"session=([^;]*)\")" + } + ] + }, + { + @id:"3", + input:"2", + memo:"transform2", + op:"transform", + transforms:[ + { + ref:"userId", + expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")" + }, + { + ref:"session", + expr:"regex_like('activity.cookie', \"session=([^;]*)\")" + } + ] + }, + { + @id:"7", + input:"3", + op:"sequence", + do:[ + { + op:"transform", + memo:"seq_transform", + transforms:[ + { + ref:"happy", + expr:"regex_like('ep2', \"dink\")" + } + ] + } + , + { + op:"transform", + memo:"last_transform", + transforms:[ + { + ref:"abc", + expr:"123" + } + ] + } + ] + }, + { + @id:"10", + input:"3", + op:"transform", + memo:"t3", + transforms:[ + { + ref:"happy", + expr:"regex_like('ep2', \"dink\")" + } + ] + }, + { + @id:12, + op:"join", + type: "inner", + left:"7", + right:"10", + conditions: [{relationship:"==", left: "1", right: "1" }] + } + , + { + input: 12, + op: "store", + memo: "output sink", + target: { + file: "console:///stdout" + } + + } + + + ] +} \ No newline at end of file
