Repository: incubator-drill Updated Branches: refs/heads/master b5ab447de -> 4871fd0de
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java new file mode 100644 index 0000000..1e47684 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.allocator.VectorAllocator; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; + +public class HiveTextRecordReader extends HiveRecordReader { + + public final byte delimiter; + public final List<Integer> columnIds; + private final int numCols; + + public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException { + super(table, partition, inputSplit, columns, context); + String d = table.getSd().getSerdeInfo().getParameters().get("field.delim"); + if (d != null) { + delimiter = d.getBytes()[0]; + } else { + delimiter = (byte) 1; + } + assert delimiter > 0; + List<Integer> ids = Lists.newArrayList(); + for (int i = 0; i < tableColumns.size(); i++) { + if (columnNames.contains(tableColumns.get(i))) { + ids.add(i); + } + } + columnIds = ids; + numCols = tableColumns.size(); + } + + public boolean setValue(PrimitiveObjectInspector.PrimitiveCategory pCat, ValueVector vv, int index, byte[] bytes, int start) { + switch(pCat) { + case BINARY: + throw new UnsupportedOperationException(); + case BOOLEAN: + throw new UnsupportedOperationException(); + case BYTE: + throw new UnsupportedOperationException(); + case DECIMAL: + throw new UnsupportedOperationException(); + case DOUBLE: + throw new UnsupportedOperationException(); + case FLOAT: + throw new UnsupportedOperationException(); + case INT: { + int value = 0; + byte b; + for (int i = start; (b = bytes[i]) != delimiter; i++) { + value = (value * 10) + b - 48; + } + ((IntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors + return true; + } + case LONG: { + long value = 0; + byte b; + for (int i = start; (b = bytes[i]) != delimiter; i++) { + value = (value * 10) + b - 48; + } + ((BigIntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors + return true; + } + case SHORT: + throw new UnsupportedOperationException(); + case STRING: { + int end = start; + for (int i = start; i < bytes.length; i++) { + if (bytes[i] == delimiter) { + end = i; + break; + } + end = bytes.length; + } + return ((VarCharVector) vv).getMutator().setSafe(index, bytes, start, end - start); + } + case TIMESTAMP: + throw new UnsupportedOperationException(); + + default: + throw new UnsupportedOperationException("Could not determine type"); + } + } + + + @Override + public int next() { + for (ValueVector vv : vectors) { + VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT); + } + try { + int recordCount = 0; + if (redoRecord != null) { + int length = ((Text) value).getLength(); + byte[] bytes = ((Text) value).getBytes(); + int[] delimPositions = new int[numCols]; + delimPositions[0] = -1; + int p = 0; + for (int i = 0; i < length; i++) { + if (bytes[i] == delimiter) { + delimPositions[p++] = i; + } + } + for (int id : columnIds) { + boolean success = setValue(primitiveCategories.get(id), vectors.get(id), recordCount, bytes, delimPositions[id]); + if (!success) { + throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnNames.get(id))); + } + + } + redoRecord = null; + } + while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) { + int length = ((Text) value).getLength(); + byte[] bytes = ((Text) value).getBytes(); + int[] delimPositions = new int[numCols + 1]; + delimPositions[0] = -1; + int p = 1; + for (int i = 0; i < length; i++) { + if (bytes[i] == delimiter) { + delimPositions[p++] = i; + } + } + for (int i = 0; i < columnIds.size(); i++) { + int id = columnIds.get(i); + boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, bytes, delimPositions[id] + 1); + if (!success) { + redoRecord = value; + if (partition != null) populatePartitionVectors(recordCount); + return recordCount; + } + } + recordCount++; + } + if (partition != null) populatePartitionVectors(recordCount); + return recordCount; + } catch (IOException e) { + throw new DrillRuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java index 209961d..86be49e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.store.ClassPathFileSystem; import org.apache.drill.exec.store.SchemaProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import com.beust.jcommander.internal.Lists; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java new file mode 100644 index 0000000..aa68752 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util; + +import com.beust.jcommander.internal.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.List; + +/** + * This is a tool for printing the content of record batches to screen. Used for debugging. + */ +public class BatchPrinter { + public static void printHyperBatch(VectorAccessible batch) { + List<String> columns = Lists.newArrayList(); + List<ValueVector> vectors = Lists.newArrayList(); + int numBatches = 0; + for (VectorWrapper vw : batch) { + columns.add(vw.getValueVectors()[0].getField().getName()); + numBatches = vw.getValueVectors().length; + } + int width = columns.size(); + for (int i = 0; i < numBatches; i++) { + int rows = batch.iterator().next().getValueVectors()[i].getMetadata().getValueCount(); + for (int j = 0; j < rows; j++) { + for (VectorWrapper vw : batch) { + Object o = vw.getValueVectors()[i].getAccessor().getObject(j); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + } + } + System.out.printf("|\n"); + } + } + System.out.printf("|\n"); + } + public static void printBatch(VectorAccessible batch) { + List<String> columns = Lists.newArrayList(); + List<ValueVector> vectors = Lists.newArrayList(); + for (VectorWrapper vw : batch) { + columns.add(vw.getValueVector().getField().getName()); + vectors.add(vw.getValueVector()); + } + int width = columns.size(); + int rows = vectors.get(0).getMetadata().getValueCount(); + for (int row = 0; row < rows; row++) { + if (row%50 == 0) { + System.out.println(StringUtils.repeat("-", width * 17 + 1)); + for (String column : columns) { + System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14)); + } + System.out.printf("|\n"); + System.out.println(StringUtils.repeat("-", width*17 + 1)); + } + for (ValueVector vv : vectors) { + Object o = vv.getAccessor().getObject(row); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + } + } + System.out.printf("|\n"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index c801163..4dd08c1 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -82,5 +82,6 @@ drill.exec: { delete: false, size: 100000000 } - } + }, + cache.hazel.subnets: ["*.*.*.*"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java new file mode 100644 index 0000000..71e6283 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec; + +import org.apache.drill.exec.client.QuerySubmitter; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; + +/** + * Created with IntelliJ IDEA. + * User: sphillips + * Date: 1/24/14 + * Time: 3:46 PM + * To change this template use File | Settings | File Templates. + */ +public class TestPlan { + + String location = "/Users/sphillips/hive-lineitem-orderkey"; + String type = "physical"; + String zkQuorum = null; + boolean local = true; + int bits = 1; + + + @Test + @Ignore + public void testSubmitPlan() throws Exception { + QuerySubmitter submitter = new QuerySubmitter(); + submitter.submitQuery(location, type, zkQuorum, local, bits); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java new file mode 100644 index 0000000..c6edc20 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Created with IntelliJ IDEA. + * User: sphillips + * Date: 1/23/14 + * Time: 5:22 AM + * To change this template use File | Settings | File Templates. + */ +public class TestHiveScan extends PopUnitTestBase { + @Ignore + @Test + public void twoBitTwoExchangeTwoEntryRun() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + bit2.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/hive/test.json"), + Charsets.UTF_8)); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(100, count); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java deleted file mode 100644 index aa68752..0000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.util; - -import com.beust.jcommander.internal.Lists; -import org.apache.commons.lang.StringUtils; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.vector.ValueVector; - -import java.util.List; - -/** - * This is a tool for printing the content of record batches to screen. Used for debugging. - */ -public class BatchPrinter { - public static void printHyperBatch(VectorAccessible batch) { - List<String> columns = Lists.newArrayList(); - List<ValueVector> vectors = Lists.newArrayList(); - int numBatches = 0; - for (VectorWrapper vw : batch) { - columns.add(vw.getValueVectors()[0].getField().getName()); - numBatches = vw.getValueVectors().length; - } - int width = columns.size(); - for (int i = 0; i < numBatches; i++) { - int rows = batch.iterator().next().getValueVectors()[i].getMetadata().getValueCount(); - for (int j = 0; j < rows; j++) { - for (VectorWrapper vw : batch) { - Object o = vw.getValueVectors()[i].getAccessor().getObject(j); - if (o instanceof byte[]) { - String value = new String((byte[]) o); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); - } else { - String value = o.toString(); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); - } - } - System.out.printf("|\n"); - } - } - System.out.printf("|\n"); - } - public static void printBatch(VectorAccessible batch) { - List<String> columns = Lists.newArrayList(); - List<ValueVector> vectors = Lists.newArrayList(); - for (VectorWrapper vw : batch) { - columns.add(vw.getValueVector().getField().getName()); - vectors.add(vw.getValueVector()); - } - int width = columns.size(); - int rows = vectors.get(0).getMetadata().getValueCount(); - for (int row = 0; row < rows; row++) { - if (row%50 == 0) { - System.out.println(StringUtils.repeat("-", width * 17 + 1)); - for (String column : columns) { - System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14)); - } - System.out.printf("|\n"); - System.out.println(StringUtils.repeat("-", width*17 + 1)); - } - for (ValueVector vv : vectors) { - Object o = vv.getAccessor().getObject(row); - if (o instanceof byte[]) { - String value = new String((byte[]) o); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); - } else { - String value = o.toString(); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); - } - } - System.out.printf("|\n"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/test/resources/hive/test.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/hive/test.json b/exec/java-exec/src/test/resources/hive/test.json new file mode 100644 index 0000000..a039d9e --- /dev/null +++ b/exec/java-exec/src/test/resources/hive/test.json @@ -0,0 +1,75 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"hive-scan", + storageengine: { type: "hive"}, + hive-table: { + "tableName" : "nation", + "dbName" : "default", + "owner" : "root", + "createTime" : 1386876893, + "lastAccessTime" : 0, + "retention" : 0, + "sd" : { + "cols" : [ { + "name" : "n_nationkey", + "type" : "bigint", + "comment" : null + }, { + "name" : "n_name", + "type" : "string", + "comment" : null + }, { + "name" : "n_regionkey", + "type" : "bigint", + "comment" : null + }, { + "name" : "n_comment", + "type" : "string", + "comment" : null + } ], + "location" : "maprfs:/user/hive/warehouse/nation", + "inputFormat" : "org.apache.hadoop.mapred.TextInputFormat", + "outputFormat" : "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "compressed" : false, + "numBuckets" : 0, + "serDeInfo" : { + "name" : null, + "serializationLib" : "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "parameters" : { + "serialization.format" : "|", + "field.delim" : "|" + } + }, + "sortCols" : [ ], + "parameters" : { } + }, + "partitionKeys" : [ ], + "parameters" : { + "numPartitions" : "0", + "numFiles" : "1", + "transient_lastDdlTime" : "1386877487", + "totalSize" : "2224", + "numRows" : "0", + "rawDataSize" : "0" + }, + "viewOriginalText" : null, + "viewExpandedText" : null, + "tableType" : "MANAGED_TABLE" + } + }, + { + @id: 2, + child: 1, + pop: "screen" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2121adc..f8ddbf2 100644 --- a/pom.xml +++ b/pom.xml @@ -356,6 +356,52 @@ <version>${dep.slf4j.version}</version> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>0.12.0</version> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging-api</artifactId> + </exclusion> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-hbase-handler</artifactId> + <version>0.12.0</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- Test Dependencies --> <dependency> <groupId>com.googlecode.jmockit</groupId> @@ -594,6 +640,11 @@ <version>1.0.3-mapr-3.0.0</version> </dependency> <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>0.94.13-mapr-1401-m7-3.0.2</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.0.3-mapr-3.0.0</version> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java b/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java index fefe7bc..4712302 100644 --- a/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java +++ b/sqlparser/src/main/java/org/apache/drill/jdbc/DrillHandler.java @@ -41,7 +41,7 @@ import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.SchemaProvider; import org.apache.drill.exec.store.SchemaProviderRegistry; -import org.apache.drill.exec.store.hive.HiveSchemaProvider; +import org.apache.drill.exec.store.hive.HiveStorageEngine.HiveSchemaProvider; import org.apache.drill.exec.store.json.JsonSchemaProvider; import org.apache.drill.exec.store.parquet.ParquetSchemaProvider; import org.apache.drill.sql.client.full.FileSystemSchema; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java b/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java index d444348..1b58805 100644 --- a/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java +++ b/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import jline.internal.Preconditions; +import com.google.common.base.Preconditions; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.exception.SchemaChangeException; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveDatabaseSchema.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveDatabaseSchema.java b/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveDatabaseSchema.java index c12057a..c02a478 100644 --- a/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveDatabaseSchema.java +++ b/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveDatabaseSchema.java @@ -31,12 +31,15 @@ import net.hydromatic.optiq.impl.java.JavaTypeFactory; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.store.SchemaProvider; +import org.apache.drill.exec.store.hive.HiveReadEntry; +import org.apache.drill.exec.store.hive.HiveStorageEngine; import org.apache.drill.exec.store.hive.HiveStorageEngineConfig; import org.apache.drill.jdbc.DrillTable; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -53,11 +56,11 @@ public class HiveDatabaseSchema implements Schema{ private final String name; private final Expression expression; private final QueryProvider queryProvider; - private final SchemaProvider schemaProvider; + private final HiveStorageEngine.HiveSchemaProvider schemaProvider; private final DrillClient client; private final HiveStorageEngineConfig config; - public HiveDatabaseSchema(DrillClient client, HiveStorageEngineConfig config, SchemaProvider schemaProvider, + public HiveDatabaseSchema(DrillClient client, HiveStorageEngineConfig config, HiveStorageEngine.HiveSchemaProvider schemaProvider, JavaTypeFactory typeFactory, HiveSchema parentSchema, String name, Expression expression, QueryProvider queryProvider) { super(); @@ -121,7 +124,54 @@ public class HiveDatabaseSchema implements Schema{ return Collections.EMPTY_LIST; } - static Map<PrimitiveObjectInspector.PrimitiveCategory, SqlTypeName> mapPrimHive2Sql = new HashMap<>(); + private RelDataType getRelDataTypeFromHiveTypeString(String type) { + switch(type) { + case "boolean": + return typeFactory.createSqlType(SqlTypeName.BOOLEAN); + + case "tinyint": + return typeFactory.createSqlType(SqlTypeName.TINYINT); + + case "smallint": + return typeFactory.createSqlType(SqlTypeName.SMALLINT); + + case "int": + return typeFactory.createSqlType(SqlTypeName.INTEGER); + + case "bigint": + return typeFactory.createSqlType(SqlTypeName.BIGINT); + + case "float": + return typeFactory.createSqlType(SqlTypeName.FLOAT); + + case "double": + return typeFactory.createSqlType(SqlTypeName.DOUBLE); + + case "date": + return typeFactory.createSqlType(SqlTypeName.DATE); + + case "timestamp": + return typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + + case "binary": + return typeFactory.createSqlType(SqlTypeName.BINARY); + + case "decimal": + return typeFactory.createSqlType(SqlTypeName.DECIMAL); + + case "string": + case "varchar": { + return typeFactory.createTypeWithCharsetAndCollation( + typeFactory.createSqlType(SqlTypeName.VARCHAR), /*input type*/ + Charset.forName("ISO-8859-1"), /*unicode char set*/ + SqlCollation.IMPLICIT /* TODO: need to decide if implicit is the correct one */ + ); + } + + default: + throw new RuntimeException("Unknown or unsupported hive type: " + type); + } + } private RelDataType getRelDataTypeFromHivePrimitiveType(PrimitiveObjectInspector poi) { switch(poi.getPrimitiveCategory()) { @@ -161,7 +211,7 @@ public class HiveDatabaseSchema implements Schema{ case VARCHAR: { return typeFactory.createTypeWithCharsetAndCollation( typeFactory.createSqlType(SqlTypeName.VARCHAR), /*input type*/ - Charset.forName("UTF-16"), /*unicode char set*/ + Charset.forName("ISO-8859-1"), /*unicode char set*/ SqlCollation.IMPLICIT /* TODO: need to decide if implicit is the correct one */ ); } @@ -189,22 +239,20 @@ public class HiveDatabaseSchema implements Schema{ @SuppressWarnings("unchecked") @Override public <E> Table<E> getTable(String name, Class<E> elementType) { - try { - org.apache.hadoop.hive.ql.metadata.Table hiveTable = - parentSchema.getHiveDb().getTable(getName(), name, false /*throwException*/); - - if (hiveTable == null) { - logger.debug("Table name {} is invalid", name); - return null; - } + Object selection = schemaProvider.getSelectionBaseOnName(String.format("%s.%s",this.name, name)); + if(selection == null) return null; + org.apache.hadoop.hive.metastore.api.Table t = ((HiveReadEntry) selection).getTable(); + if (t == null) { + logger.debug("Table name {} is invalid", name); + return null; + } + org.apache.hadoop.hive.ql.metadata.Table hiveTable = new org.apache.hadoop.hive.ql.metadata.Table(t); - Object selection = schemaProvider.getSelectionBaseOnName(name); - if(selection == null) return null; - final MethodCallExpression call = Expressions.call(getExpression(), // - BuiltinMethod.DATA_CONTEXT_GET_TABLE.method, // - Expressions.constant(name), // - Expressions.constant(Object.class)); + final MethodCallExpression call = Expressions.call(getExpression(), // + BuiltinMethod.DATA_CONTEXT_GET_TABLE.method, // + Expressions.constant(name), // + Expressions.constant(Object.class)); ArrayList<RelDataType> typeList = new ArrayList<>(); ArrayList<String> fieldNameList = new ArrayList<>(); @@ -215,21 +263,22 @@ public class HiveDatabaseSchema implements Schema{ typeList.add(getRelDataTypeFromHiveType(hiveField.getFieldObjectInspector())); } - final RelDataType rowType = typeFactory.createStructType(typeList, fieldNameList); - return (Table<E>) new DrillTable( - client, - this, - Object.class, - call, - rowType, - name, - null /*storageEngineName*/, - selection, - config /*storageEngineConfig*/); - } catch (HiveException ex) { - logger.error("getTable failed", ex); - return null; - } + for (FieldSchema field : hiveTable.getPartitionKeys()) { + fieldNameList.add(field.getName()); + typeList.add(getRelDataTypeFromHiveTypeString(field.getType())); + } + + final RelDataType rowType = typeFactory.createStructType(typeList, fieldNameList); + return (Table<E>) new DrillTable( + client, + this, + Object.class, + call, + rowType, + name, + null /*storageEngineName*/, + selection, + config /*storageEngineConfig*/); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveSchema.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveSchema.java b/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveSchema.java index ffd6a6e..007c6d9 100644 --- a/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveSchema.java +++ b/sqlparser/src/main/java/org/apache/drill/sql/client/full/HiveSchema.java @@ -30,6 +30,7 @@ import org.apache.drill.common.logical.StorageEngineConfig; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.store.SchemaProvider; import org.apache.drill.exec.store.hive.HiveStorageEngineConfig; +import org.apache.drill.exec.store.hive.HiveStorageEngine.HiveSchemaProvider; import org.apache.drill.jdbc.DrillTable; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -47,7 +48,7 @@ public class HiveSchema implements Schema{ private final String name; private final Expression expression; private final QueryProvider queryProvider; - private final SchemaProvider schemaProvider; + private final HiveSchemaProvider schemaProvider; private final DrillClient client; private final HiveStorageEngineConfig config; private Hive hiveDb; @@ -62,7 +63,7 @@ public class HiveSchema implements Schema{ this.name = name; this.expression = expression; this.queryProvider = queryProvider; - this.schemaProvider = schemaProvider; + this.schemaProvider = (HiveSchemaProvider) schemaProvider; this.config = (HiveStorageEngineConfig) config; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java index 4f9d210..5e5812c 100644 --- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java +++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java @@ -54,6 +54,7 @@ public class FullEngineTest { * @throws Exception */ @Test(timeout=100000) // derby initialization is slow + @Ignore public void listHiveTables() throws Exception { JdbcAssert.withFull("hive-derby") .sql("select * from \"metadata\".\"TABLES\"")