DRILL-1113: Add configuration in DrillClient to encode complex/repeated types as JSON string
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2dcf8cb8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2dcf8cb8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2dcf8cb8 Branch: refs/heads/master Commit: 2dcf8cb8ab53379f077979ecf3fe38b34994bf7c Parents: 9abffa1 Author: Aditya Kishore <adi...@maprtech.com> Authored: Mon Jul 7 15:03:12 2014 -0700 Committer: Aditya Kishore <adi...@maprtech.com> Committed: Wed Jul 9 11:55:46 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/hbase/BaseHBaseTest.java | 27 ------------------ .../drill/hbase/TestHBaseCFAsJSONString.java | 2 +- .../org/apache/drill/exec/ExecConstants.java | 2 +- .../apache/drill/exec/client/DrillClient.java | 8 ++++-- .../apache/drill/exec/rpc/user/UserClient.java | 17 ++---------- .../src/main/resources/drill-module.conf | 4 +++ .../java/org/apache/drill/BaseTestQuery.java | 29 ++++++++++++++++++++ 7 files changed, 42 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java index e6a5474..36cf15b 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -52,8 +52,6 @@ public class BaseHBaseTest extends BaseTestQuery { @Rule public TestName TEST_NAME = new TestName(); - private int[] columnWidths = new int[] { 8 }; - @Before public void printID() throws Exception { System.out.printf("Running %s#%s\n", getClass().getName(), TEST_NAME.getMethodName()); @@ -80,14 +78,6 @@ public class BaseHBaseTest extends BaseTestQuery { HBaseTestsSuite.tearDownCluster(); } - protected void setColumnWidth(int columnWidth) { - this.columnWidths = new int[] { columnWidth }; - } - - protected void setColumnWidths(int[] columnWidths) { - this.columnWidths = columnWidths; - } - protected String getPlanText(String planFile, String tableName) throws IOException { return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8) .replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\" : " + HBaseTestsSuite.getZookeeperPort()) @@ -111,23 +101,6 @@ public class BaseHBaseTest extends BaseTestQuery { printResultAndVerifyRowCount(results, expectedRowCount); } - protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException { - int rowCount = 0; - RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - for(QueryResultBatch result : results){ - rowCount += result.getHeader().getRowCount(); - loader.load(result.getHeader().getDef(), result.getData()); - if (loader.getRecordCount() <= 0) { - break; - } - VectorUtil.showVectorAccessibleContent(loader, columnWidths); - loader.clear(); - result.release(); - } - System.out.println("Total record count: " + rowCount); - return rowCount; - } - private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount) throws SchemaChangeException { int rowCount = printResult(results); if (expectedRowCount != -1) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java index 9cc0356..4b4b648 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java @@ -39,7 +39,7 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest { } @AfterClass - public static void closeClient() throws IOException { + public static void closeMyClient() throws IOException { if(client != null) client.close(); client = parent_client; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 7681dd5..c2f459e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -75,7 +75,7 @@ public interface ExecConstants { public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write"; public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak"; - + public static final String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types"; public static final String OUTPUT_FORMAT_OPTION = "store.format"; public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 3a9d015..6690bf5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -71,7 +71,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ private final BufferAllocator allocator; private int reconnectTimes; private int reconnectDelay; - private boolean supportComplexTypes = true; + private boolean supportComplexTypes; private final boolean ownsZkConnection; private final boolean ownsAllocator; @@ -99,6 +99,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ this.clusterCoordinator = coordinator; this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY); + this.supportComplexTypes = config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES); } public DrillConfig getConfig(){ @@ -161,7 +162,8 @@ public class DrillClient implements Closeable, ConnectionThrottle{ // just use the first endpoint for now DrillbitEndpoint endpoint = endpoints.iterator().next(); - this.client = new UserClient(allocator, TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-")); + this.client = new UserClient(supportComplexTypes, allocator, + TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-")); logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); connect(endpoint); connected = true; @@ -193,7 +195,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ private void connect(DrillbitEndpoint endpoint) throws RpcException { FutureHandler f = new FutureHandler(); try { - client.setSupportComplexTypes(supportComplexTypes).connect(f, endpoint, props); + client.connect(f, endpoint, props); f.checkedGet(); } catch (InterruptedException e) { throw new RpcException(e); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index ad885f6..d49a9fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -46,8 +46,9 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand private boolean supportComplexTypes = true; - public UserClient(BufferAllocator alloc, EventLoopGroup eventLoopGroup) { + public UserClient(boolean supportComplexTypes, BufferAllocator alloc, EventLoopGroup eventLoopGroup) { super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER); + this.supportComplexTypes = supportComplexTypes; } public void submitQuery(UserResultsListener resultsListener, RunQuery query) { @@ -112,18 +113,4 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); } - /** - * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set. - * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type. - * - * @throws IllegalStateException if called after a connection has been established. - */ - public UserClient setSupportComplexTypes(boolean supportComplexTypes) { - if (isActive()) { - throw new IllegalStateException("Attempted to modify connection property after connection has been established."); - } - this.supportComplexTypes = supportComplexTypes; - return this; - } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 8a69ec1..81a2bc2 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -19,6 +19,10 @@ drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl" +drill.client: { + supports-complex-types: true +} + drill.exec: { cluster-id: "drillbits1" rpc: { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 7cfe51a..07a2075 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -32,10 +32,12 @@ import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.client.PrintingResultsListener; import org.apache.drill.exec.client.QuerySubmitter; import org.apache.drill.exec.client.QuerySubmitter.Format; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryResultBatch; @@ -55,6 +57,8 @@ import com.google.common.io.Resources; public class BaseTestQuery extends ExecTest{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class); + private int[] columnWidths = new int[] { 8 }; + private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache"; @SuppressWarnings("serial") @@ -227,4 +231,29 @@ public class BaseTestQuery extends ExecTest{ } } + protected void setColumnWidth(int columnWidth) { + this.columnWidths = new int[] { columnWidth }; + } + + protected void setColumnWidths(int[] columnWidths) { + this.columnWidths = columnWidths; + } + + protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException { + int rowCount = 0; + RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); + for(QueryResultBatch result : results){ + rowCount += result.getHeader().getRowCount(); + loader.load(result.getHeader().getDef(), result.getData()); + if (loader.getRecordCount() <= 0) { + break; + } + VectorUtil.showVectorAccessibleContent(loader, columnWidths); + loader.clear(); + result.release(); + } + System.out.println("Total record count: " + rowCount); + return rowCount; + } + }