CAMEL-10327, new component apache-drill
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/195736c4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/195736c4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/195736c4 Branch: refs/heads/master Commit: 195736c46b6156836ffedd4fe39d8e5e30d28057 Parents: 443b073 Author: Fabrizio Spataro <fabrizio.spat...@bizmate.it> Authored: Fri Oct 14 11:19:33 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Oct 17 13:11:28 2016 +0200 ---------------------------------------------------------------------- components-starter/pom.xml | 1 + components/camel-drill/pom.xml | 10 ++- .../src/main/docs/drill-component.adoc | 21 +++--- .../camel/component/drill/DrillComponent.java | 18 ++++- .../camel/component/drill/DrillConstants.java | 2 +- .../camel/component/drill/DrillEndpoint.java | 70 ++++++++++++-------- .../camel/component/drill/DrillProducer.java | 11 +-- .../camel/component/drill/EndpointTest.java | 48 ++++++++++++++ .../camel/component/drill/ProducerTest.java | 18 +++-- 9 files changed, 146 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components-starter/pom.xml ---------------------------------------------------------------------- diff --git a/components-starter/pom.xml b/components-starter/pom.xml index 65b221e..844882a 100644 --- a/components-starter/pom.xml +++ b/components-starter/pom.xml @@ -102,6 +102,7 @@ <module>camel-dns-starter</module> <module>camel-docker-starter</module> <module>camel-dozer-starter</module> + <module>camel-drill-starter</module> <module>camel-dropbox-starter</module> <module>camel-eclipse-starter</module> <module>camel-ehcache-starter</module> http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-drill/pom.xml b/components/camel-drill/pom.xml index 634bdb3..1d2b47d 100644 --- a/components/camel-drill/pom.xml +++ b/components/camel-drill/pom.xml @@ -21,7 +21,7 @@ <parent> <groupId>org.apache.camel</groupId> <artifactId>components</artifactId> - <version>2.18.0-SNAPSHOT</version> + <version>2.19.0-SNAPSHOT</version> </parent> <artifactId>camel-drill</artifactId> @@ -42,8 +42,14 @@ </dependency> <dependency> <groupId>org.apache.drill.exec</groupId> - <artifactId>drill-jdbc</artifactId> + <artifactId>drill-jdbc-all</artifactId> <version>${apache-drill-version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.springframework</groupId> http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/src/main/docs/drill-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-drill/src/main/docs/drill-component.adoc b/components/camel-drill/src/main/docs/drill-component.adoc index 1f227b0..6da5f4a 100644 --- a/components/camel-drill/src/main/docs/drill-component.adoc +++ b/components/camel-drill/src/main/docs/drill-component.adoc @@ -27,7 +27,7 @@ URI format [source,java] -------------------------------- -drill://host[:port][?options] +drill://host[?options] -------------------------------- You can append query options to the URI in the following format, @@ -37,7 +37,7 @@ You can append query options to the URI in the following format, Drill Producer ^^^^^^^^^^^^^ -The producer execute *CamelDrillQuery* header and put results into body. +The producer execute query using *CamelDrillQuery* header and put results into body. [[Drill-Options]] Options @@ -50,17 +50,22 @@ The DRILL component has no options. // endpoint options: START -The DRILL component supports 6 endpoint options which are listed below: +The DRILL component supports 5 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description -| host | producer | | String | *Required* ZooKeeper host name or IP address. Use local instead of a host name or IP address to connect to the local Drillbit -| port | producer | | Integer | ZooKeeper port number -| clusterId | producer | drillbits1 | String | Cluster ID https://drill.apache.org/docs/using-the-jdbc-driver/determining-the-cluster-id -| directory | producer | /Drill | String | Drill directory in ZooKeeper -| schema | producer | | String | Storage plugin https://drill.apache.org/docs/storage-plugin-registration +| port | producer | 2181 | Integer | Connection port +| clusterId | producer | | String | Cluster ID https://drill.apache.org/docs/using-the-jdbc-driver/determining-the-cluster-id +| directory | producer | | String | Drill directory in ZooKeeper +| mode | producer | zk | String | Connection mode: + + - zk: Zookeeper + + - drillbit: Drillbit direct connection + + https://drill.apache.org/docs/using-the-jdbc-driver/ |======================================================================= {% endraw %} // endpoint options: END http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillComponent.java b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillComponent.java index d798cb3..dfb0108 100644 --- a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillComponent.java +++ b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillComponent.java @@ -18,23 +18,37 @@ package org.apache.camel.component.drill; import java.util.Map; +import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; /** - * Represents the component that manages {@link DrillEndpoint}. It holds the list - * of named direct endpoints. + * Represents the component that manages {@link DrillEndpoint}. It holds the + * list of named direct endpoints. */ public class DrillComponent extends UriEndpointComponent { + protected enum DrillConnectionMode { + ZK, DRILLBIT + } + public DrillComponent() { super(DrillEndpoint.class); } + public DrillComponent(final CamelContext context) { + super(context, DrillEndpoint.class); + } + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { DrillEndpoint endpoint = new DrillEndpoint(uri, this); setProperties(endpoint, parameters); + endpoint.setHost(remaining); + + // check mode + DrillConnectionMode.valueOf(endpoint.getMode().toUpperCase()); + return endpoint; } } http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillConstants.java b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillConstants.java index 39c569f..136b39b 100644 --- a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillConstants.java +++ b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillConstants.java @@ -18,7 +18,7 @@ package org.apache.camel.component.drill; public final class DrillConstants { public static final String DRILL_QUERY = "CamelDrillQuery"; - + public static final String DRILL_DRIVER = "org.apache.drill.jdbc.Driver"; private DrillConstants() { http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillEndpoint.java b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillEndpoint.java index eeb55ec..93c174b 100644 --- a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillEndpoint.java +++ b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillEndpoint.java @@ -21,9 +21,12 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; +import oadd.org.apache.commons.lang.StringUtils; + import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.drill.DrillComponent.DrillConnectionMode; import org.apache.camel.impl.DefaultPollingEndpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; @@ -33,28 +36,30 @@ import org.springframework.jdbc.core.ColumnMapRowMapper; import org.springframework.jdbc.core.RowMapperResultSetExtractor; - /** - * The drill component gives you the ability to quering into apache drill cluster. + * The drill component gives you the ability to quering into apache drill + * cluster. */ -@UriEndpoint(scheme = "drill", title = "DRILL", syntax = "drill:host:port", producerOnly = true, label = "database,sql") +@UriEndpoint(scheme = "drill", title = "DRILL", syntax = "drill:host", producerOnly = true, label = "database,sql") public class DrillEndpoint extends DefaultPollingEndpoint { - @UriPath(description = "ZooKeeper host name or IP address") @Metadata(required = "true") + @UriPath(description = "Host name or IP address") + @Metadata(required = "true") private String host; - @UriPath(description = "ZooKeeper port number") @Metadata(required = "false", defaultValue = "2181") + @UriParam(description = "Port number") + @Metadata(required = "false", defaultValue = "2181") private Integer port = 2181; - @UriParam(description = "Storage plugin", defaultValue = "") - private String schema = ""; - @UriParam(description = "Drill directory in ZooKeeper", defaultValue = "/Drill") - private String directory = "/Drill"; - @UriParam(defaultValue = "drillbits1") - private String clusterId = "drillbits1"; + @UriParam(description = "Drill directory", defaultValue = "") + private String directory = ""; + @UriParam(defaultValue = "") + private String clusterId = ""; + @UriParam(defaultValue = "zk") + private String mode = "zk"; /** * creates a drill endpoint * - * @param uri the endpoint uri + * @param uri the endpoint uri * @param component the component */ public DrillEndpoint(String uri, DrillComponent component) { @@ -75,10 +80,24 @@ public class DrillEndpoint extends DefaultPollingEndpoint { } public String toJDBCUri() { - //jdbc:drill:zk=<zk name>[:<port>][,<zk name2>[:<port>]...<directory>/<cluster ID>;[schema=<storage plugin>] - return "jdbc:drill:zk=" + host + ":" + port + directory + "/" + clusterId; + String url = "jdbc:drill:"; + if (mode.toUpperCase().equals(DrillConnectionMode.DRILLBIT.name())) { + // TODO JIRA BUG connection mode + url += mode + "=" + host; + } else { + url += mode + "=" + host + ":" + port; + } + + if (StringUtils.isNotBlank(directory)) { + url += "/" + directory; + } + if (StringUtils.isNotBlank(clusterId)) { + url += "/" + clusterId; + } + + return url; } - + @SuppressWarnings("unchecked") public List<?> queryForList(ResultSet rs) throws SQLException { ColumnMapRowMapper rowMapper = new ColumnMapRowMapper(); @@ -86,13 +105,14 @@ public class DrillEndpoint extends DefaultPollingEndpoint { List<Map<String, Object>> data = mapper.extractData(rs); return data; } - + public String getHost() { return host; } - + /** - * ZooKeeper host name or IP address. Use local instead of a host name or IP address to connect to the local Drillbit + * ZooKeeper host name or IP address. Use local instead of a host name or IP + * address to connect to the local Drillbit * * @param host */ @@ -132,7 +152,6 @@ public class DrillEndpoint extends DefaultPollingEndpoint { /** * Cluster ID - * * https://drill.apache.org/docs/using-the-jdbc-driver/#determining-the-cluster-id * * @param clusterId @@ -142,18 +161,17 @@ public class DrillEndpoint extends DefaultPollingEndpoint { } /** - * Storage plugin - * - * https://drill.apache.org/docs/storage-plugin-registration + * Connection mode: zk: Zookeeper drillbit: Drillbit direct connection + * https://drill.apache.org/docs/using-the-jdbc-driver/ * * @return */ - public String getSchema() { - return schema; + public String getMode() { + return mode; } - public void setSchema(String schema) { - this.schema = schema; + public void setMode(String mode) { + this.mode = mode; } } http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillProducer.java b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillProducer.java index f069e2c..9fff0d7 100644 --- a/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillProducer.java +++ b/components/camel-drill/src/main/java/org/apache/camel/component/drill/DrillProducer.java @@ -24,8 +24,6 @@ import java.sql.Statement; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A drill producer @@ -61,14 +59,14 @@ public class DrillProducer extends DefaultProducer { @Override public void process(final Exchange exchange) throws Exception { final String query = exchange.getIn().getHeader(DrillConstants.DRILL_QUERY, String.class); - + // check query Statement st = null; ResultSet rs = null; try { st = connection.createStatement(); rs = st.executeQuery(query); - + exchange.getIn().setBody(endpoint.queryForList(rs)); } finally { try { @@ -84,6 +82,11 @@ public class DrillProducer extends DefaultProducer { private void createJDBCConnection() throws ClassNotFoundException, SQLException { Class.forName(DrillConstants.DRILL_DRIVER); + + // if(log.isDebugEnabled()) { + log.info("connection url: {}", endpoint.toJDBCUri()); + // } + this.connection = DriverManager.getConnection(endpoint.toJDBCUri()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/src/test/java/org/apache/camel/component/drill/EndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-drill/src/test/java/org/apache/camel/component/drill/EndpointTest.java b/components/camel-drill/src/test/java/org/apache/camel/component/drill/EndpointTest.java new file mode 100644 index 0000000..ad95a4b --- /dev/null +++ b/components/camel-drill/src/test/java/org/apache/camel/component/drill/EndpointTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.drill; + +import org.apache.camel.Endpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class EndpointTest extends CamelTestSupport { + + private static final String HOST = "my.host.me"; + private static final Integer PORT = 4000; + private static final String DIRECTORY = "directory"; + private static final String CLUSTERID = "clusterId"; + private static final String MODE = "zk"; + + @Test + public void testZKJdbcURL() throws Exception { + Endpoint endpoint = context.getEndpoint("drill://" + HOST + "?port=" + PORT + "&directory=" + DIRECTORY + "&clusterId=" + CLUSTERID + "&mode=" + MODE); + + final String uri = "jdbc:drill:zk=" + HOST + ":" + PORT + "/" + DIRECTORY + "/" + CLUSTERID; + + assertTrue(endpoint instanceof DrillEndpoint); + + assertEquals(HOST, ((DrillEndpoint)endpoint).getHost()); + assertEquals(PORT, ((DrillEndpoint)endpoint).getPort()); + assertEquals(DIRECTORY, ((DrillEndpoint)endpoint).getDirectory()); + assertEquals(CLUSTERID, ((DrillEndpoint)endpoint).getClusterId()); + assertEquals(MODE, ((DrillEndpoint)endpoint).getMode()); + + assertEquals(uri, ((DrillEndpoint)endpoint).toJDBCUri()); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/195736c4/components/camel-drill/src/test/java/org/apache/camel/component/drill/ProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-drill/src/test/java/org/apache/camel/component/drill/ProducerTest.java b/components/camel-drill/src/test/java/org/apache/camel/component/drill/ProducerTest.java index 3cf118f..d46d505 100644 --- a/components/camel-drill/src/test/java/org/apache/camel/component/drill/ProducerTest.java +++ b/components/camel-drill/src/test/java/org/apache/camel/component/drill/ProducerTest.java @@ -19,21 +19,23 @@ package org.apache.camel.component.drill; import java.util.concurrent.TimeUnit; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.drill.DrillComponent.DrillConnectionMode; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Ignore; import org.junit.Test; -//@Ignore("CAMEL-10327: Set host and schema to test drill producer.") +@Ignore("CAMEL-10327: Set host, mode and query to test drill producer (direct connection mode).") public class ProducerTest extends CamelTestSupport { - private final String host = "localhost"; - private final String schema = "mysql"; - + private final String host = "bizzy"; + private final String mode = DrillConnectionMode.DRILLBIT.name().toLowerCase(); + private final String query = "select * from mongo.view.events limit 100"; + @Test public void testProducer() throws Exception { template.sendBody("direct:in", ""); - + MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMinimumMessageCount(1); @@ -44,12 +46,8 @@ public class ProducerTest extends CamelTestSupport { protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() { - from("direct:in") - .to("drill://" + host + "?schema=" + schema) - .log("${body}") - .to("mock:result"); + from("direct:in").setHeader(DrillConstants.DRILL_QUERY, constant(query)).to("drill://" + host + "?mode=" + mode).log("${body}").to("mock:result"); } }; } } -