http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java new file mode 100644 index 0000000..0b87976 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.io.NullWritable; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Iterators; + +import io.druid.query.Result; +import io.druid.query.topn.DimensionAndMetricValueExtractor; +import io.druid.query.topn.TopNQuery; +import io.druid.query.topn.TopNResultValue; + +/** + * Record reader for results for Druid TopNQuery. + */ +public class DruidTopNQueryRecordReader + extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> { + + private Result<TopNResultValue> current; + private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator(); + + @Override + protected TopNQuery createQuery(String content) throws IOException { + return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TopNQuery.class); + } + + @Override + protected List<Result<TopNResultValue>> createResultsList(InputStream content) throws IOException { + return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, + new TypeReference<List<Result<TopNResultValue>>>(){}); + } + + @Override + public boolean nextKeyValue() { + if (values.hasNext()) { + return true; + } + if (results.hasNext()) { + current = results.next(); + values = current.getValue().getValue().iterator(); + return true; + } + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public DruidWritable getCurrentValue() throws IOException, InterruptedException { + // Create new value + DruidWritable value = new DruidWritable(); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + if (values.hasNext()) { + value.getValue().putAll(values.next().getBaseObject()); + return value; + } + return value; + } + + @Override + public boolean next(NullWritable key, DruidWritable value) { + if (nextKeyValue()) { + // Update value + value.getValue().clear(); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + if (values.hasNext()) { + value.getValue().putAll(values.next().getBaseObject()); + } + return true; + } + return false; + } + + @Override + public float getProgress() { + return results.hasNext() || values.hasNext() ? 0 : 1; + } + +}
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java new file mode 100644 index 0000000..77ffcd4 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidWritable.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Objects; + +/** + * Writable for Druid results. + */ +public class DruidWritable implements Writable { + + private final Map<String, Object> value; + + public DruidWritable() { + value = new HashMap<>(); + } + + public DruidWritable(Map<String, Object> value) { + this.value = value; + } + + public Map<String, Object> getValue() { + return value; + } + + @Override + public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + return Objects.hashCode(value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return Objects.equal(value, ((DruidWritable) o).value); + } + + @Override + public String toString() { + return "DruidWritable{value=" + value + '}'; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java new file mode 100644 index 0000000..2b4df78 --- /dev/null +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import java.util.List; + +import org.apache.hadoop.hive.druid.serde.DruidSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.util.StringUtils; + +import com.fasterxml.jackson.core.type.TypeReference; + +import io.druid.query.metadata.metadata.SegmentAnalysis; +import io.druid.query.metadata.metadata.SegmentMetadataQuery; + +/** + * Druid SerDe to be used in tests. + */ +public class QTestDruidSerDe extends DruidSerDe { + + // Request : + // "{\"queryType\":\"segmentMetadata\",\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"}," + // + "\"intervals\":{\"type\":\"intervals\"," + // + "\"intervals\":[\"-146136543-09-08T00:30:34.096-07:52:58/146140482-04-24T08:36:27.903-07:00\"]}," + // + "\"toInclude\":{\"type\":\"all\"},\"merge\":true,\"context\":null,\"analysisTypes\":[]," + // + "\"usingDefaultInterval\":true,\"lenientAggregatorMerge\":false,\"descending\":false}"; + private static final String RESPONSE = + "[ {\r\n " + + " \"id\" : \"merged\",\r\n " + + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n " + + " \"columns\" : {\r\n " + + " \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n " + + " \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n " + + " \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n " + + " \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n " + + " \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n " + + " \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n " + + " \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n " + + " \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n " + + " \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n " + + " \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n " + + " \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n " + + " \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n " + + " \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n " + + " \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n " + + " },\r\n " + + " \"aggregators\" : {\r\n " + + " \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n " + + " \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n " + + " \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n " + + " \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n " + + " \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n " + + " },\r\n " + + " \"queryGranularity\" : {\r\n \"type\": \"none\"\r\n },\r\n " + + " \"size\" : 300000,\r\n " + + " \"numRows\" : 5000000\r\n} ]"; + + /* Submits the request and returns */ + @Override + protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query) + throws SerDeException { + // Retrieve results + List<SegmentAnalysis> resultsList; + try { + resultsList = DruidStorageHandlerUtils.JSON_MAPPER.readValue(RESPONSE, + new TypeReference<List<SegmentAnalysis>>() {}); + } catch (Exception e) { + throw new SerDeException(StringUtils.stringifyException(e)); + } + return resultsList.get(0); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java new file mode 100644 index 0000000..0a44aaa --- /dev/null +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandler.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import org.apache.hadoop.hive.serde2.SerDe; + +/** + * Storage handler for Druid to be used in tests. It cannot connect to + * Druid, and thus it cannot execute queries. + */ +@SuppressWarnings("deprecation") +public class QTestDruidStorageHandler extends DruidStorageHandler { + + @Override + public Class<? extends SerDe> getSerDeClass() { + return QTestDruidSerDe.class; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java new file mode 100644 index 0000000..9c5c65c --- /dev/null +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Timestamp; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidSerDe; +import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.druid.data.input.Row; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Query; +import io.druid.query.Result; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.select.SelectQuery; +import io.druid.query.select.SelectResultValue; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.query.topn.TopNQuery; +import io.druid.query.topn.TopNResultValue; + +/** + * Basic tests for Druid SerDe. The examples are taken from Druid 0.9.1.1 + * documentation. + */ +public class TestDruidSerDe { + + // Timeseries query + private static final String TIMESERIES_QUERY = + "{ \"queryType\": \"timeseries\", " + + " \"dataSource\": \"sample_datasource\", " + + " \"granularity\": \"day\", " + + " \"descending\": \"true\", " + + " \"filter\": { " + + " \"type\": \"and\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" }, " + + " { \"type\": \"or\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" }, " + + " { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" } " + + " ] " + + " } " + + " ] " + + " }, " + + " \"aggregations\": [ " + + " { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" }, " + + " { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" } " + + " ], " + + " \"postAggregations\": [ " + + " { \"type\": \"arithmetic\", " + + " \"name\": \"sample_divide\", " + + " \"fn\": \"/\", " + + " \"fields\": [ " + + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" }, " + + " { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" } " + + " ] " + + " } " + + " ], " + + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}"; + // Timeseries query results + private static final String TIMESERIES_QUERY_RESULTS = + "[ " + + "{ " + + " \"timestamp\": \"2012-01-01T00:00:00.000Z\", " + + " \"result\": { \"sample_name1\": 0, \"sample_name2\": 1.0, \"sample_divide\": 2.2222 } " + + "}, " + + "{ " + + " \"timestamp\": \"2012-01-02T00:00:00.000Z\", " + + " \"result\": { \"sample_name1\": 2, \"sample_name2\": 3.32, \"sample_divide\": 4 } " + + "}]"; + // Timeseries query results as records + private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new LongWritable(0), new FloatWritable(1.0F), new FloatWritable(2.2222F) } , + new Object[] { new TimestampWritable(new Timestamp(1325462400000L)), new LongWritable(2), new FloatWritable(3.32F), new FloatWritable(4F)} + }; + + // TopN query + private static final String TOPN_QUERY = + "{ \"queryType\": \"topN\", " + + " \"dataSource\": \"sample_data\", " + + " \"dimension\": \"sample_dim\", " + + " \"threshold\": 5, " + + " \"metric\": \"count\", " + + " \"granularity\": \"all\", " + + " \"filter\": { " + + " \"type\": \"and\", " + + " \"fields\": [ " + + " { " + + " \"type\": \"selector\", " + + " \"dimension\": \"dim1\", " + + " \"value\": \"some_value\" " + + " }, " + + " { " + + " \"type\": \"selector\", " + + " \"dimension\": \"dim2\", " + + " \"value\": \"some_other_val\" " + + " } " + + " ] " + + " }, " + + " \"aggregations\": [ " + + " { " + + " \"type\": \"longSum\", " + + " \"name\": \"count\", " + + " \"fieldName\": \"count\" " + + " }, " + + " { " + + " \"type\": \"doubleSum\", " + + " \"name\": \"some_metric\", " + + " \"fieldName\": \"some_metric\" " + + " } " + + " ], " + + " \"postAggregations\": [ " + + " { " + + " \"type\": \"arithmetic\", " + + " \"name\": \"sample_divide\", " + + " \"fn\": \"/\", " + + " \"fields\": [ " + + " { " + + " \"type\": \"fieldAccess\", " + + " \"name\": \"some_metric\", " + + " \"fieldName\": \"some_metric\" " + + " }, " + + " { " + + " \"type\": \"fieldAccess\", " + + " \"name\": \"count\", " + + " \"fieldName\": \"count\" " + + " } " + + " ] " + + " } " + + " ], " + + " \"intervals\": [ " + + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" " + + " ]}"; + // TopN query results + private static final String TOPN_QUERY_RESULTS = + "[ " + + " { " + + " \"timestamp\": \"2013-08-31T00:00:00.000Z\", " + + " \"result\": [ " + + " { " + + " \"sample_dim\": \"dim1_val\", " + + " \"count\": 111, " + + " \"some_metric\": 10669, " + + " \"sample_divide\": 96.11711711711712 " + + " }, " + + " { " + + " \"sample_dim\": \"another_dim1_val\", " + + " \"count\": 88, " + + " \"some_metric\": 28344, " + + " \"sample_divide\": 322.09090909090907 " + + " }, " + + " { " + + " \"sample_dim\": \"dim1_val3\", " + + " \"count\": 70, " + + " \"some_metric\": 871, " + + " \"sample_divide\": 12.442857142857143 " + + " }, " + + " { " + + " \"sample_dim\": \"dim1_val4\", " + + " \"count\": 62, " + + " \"some_metric\": 815, " + + " \"sample_divide\": 13.14516129032258 " + + " }, " + + " { " + + " \"sample_dim\": \"dim1_val5\", " + + " \"count\": 60, " + + " \"some_metric\": 2787, " + + " \"sample_divide\": 46.45 " + + " } " + + " ] " + + " }]"; + // TopN query results as records + private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val"), new LongWritable(111), new FloatWritable(10669F), new FloatWritable(96.11711711711712F) } , + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("another_dim1_val"), new LongWritable(88), new FloatWritable(28344F), new FloatWritable(322.09090909090907F) } , + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val3"), new LongWritable(70), new FloatWritable(871F), new FloatWritable(12.442857142857143F) } , + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val4"), new LongWritable(62), new FloatWritable(815F), new FloatWritable(13.14516129032258F) } , + new Object[] { new TimestampWritable(new Timestamp(1377907200000L)), new Text("dim1_val5"), new LongWritable(60), new FloatWritable(2787F), new FloatWritable(46.45F) } + }; + + // GroupBy query + private static final String GROUP_BY_QUERY = + "{ " + + " \"queryType\": \"groupBy\", " + + " \"dataSource\": \"sample_datasource\", " + + " \"granularity\": \"day\", " + + " \"dimensions\": [\"country\", \"device\"], " + + " \"limitSpec\": {" + + " \"type\": \"default\"," + + " \"limit\": 5000," + + " \"columns\": [\"country\", \"data_transfer\"] }, " + + " \"filter\": { " + + " \"type\": \"and\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" }, " + + " { \"type\": \"or\", " + + " \"fields\": [ " + + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" }, " + + " { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" } " + + " ] " + + " } " + + " ] " + + " }, " + + " \"aggregations\": [ " + + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, " + + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } " + + " ], " + + " \"postAggregations\": [ " + + " { \"type\": \"arithmetic\", " + + " \"name\": \"avg_usage\", " + + " \"fn\": \"/\", " + + " \"fields\": [ " + + " { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" }, " + + " { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" } " + + " ] " + + " } " + + " ], " + + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ], " + + " \"having\": { " + + " \"type\": \"greaterThan\", " + + " \"aggregation\": \"total_usage\", " + + " \"value\": 100 " + + " }}"; + // GroupBy query results + private static final String GROUP_BY_QUERY_RESULTS = + "[ " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:00.000Z\", " + + " \"event\" : { " + + " \"country\" : \"India\", " + + " \"device\" : \"phone\", " + + " \"total_usage\" : 88, " + + " \"data_transfer\" : 29.91233453, " + + " \"avg_usage\" : 60.32 " + + " } " + + " }, " + + " { " + + " \"version\" : \"v1\", " + + " \"timestamp\" : \"2012-01-01T00:00:12.000Z\", " + + " \"event\" : { " + + " \"country\" : \"Spain\", " + + " \"device\" : \"pc\", " + + " \"total_usage\" : 16, " + + " \"data_transfer\" : 172.93494959, " + + " \"avg_usage\" : 6.333333 " + + " } " + + " }]"; + // GroupBy query results as records + private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1325376000000L)), new Text("India"), new Text("phone"), new LongWritable(88), new FloatWritable(29.91233453F), new FloatWritable(60.32F) } , + new Object[] { new TimestampWritable(new Timestamp(1325376012000L)), new Text("Spain"), new Text("pc"), new LongWritable(16), new FloatWritable(172.93494959F), new FloatWritable(6.333333F) } + }; + + // Select query + private static final String SELECT_QUERY = + "{ \"queryType\": \"select\", " + + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " + + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], " + + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " + + " \"granularity\": \"all\", " + + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], " + + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }"; + // Select query results + private static final String SELECT_QUERY_RESULTS = + "[{ " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"result\" : { " + + " \"pagingIdentifiers\" : { " + + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, " + + " \"events\" : [ { " + + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 0, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"robot\" : \"1\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"11._korpus_(NOVJ)\", " + + " \"language\" : \"sl\", " + + " \"newpage\" : \"0\", " + + " \"user\" : \"EmausBot\", " + + " \"count\" : 1.0, " + + " \"added\" : 39.0, " + + " \"delta\" : 39.0, " + + " \"variation\" : 39.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 1, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"112_U.S._580\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 70.0, " + + " \"delta\" : 70.0, " + + " \"variation\" : 70.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 2, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"113_U.S._243\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 77.0, " + + " \"delta\" : 77.0, " + + " \"variation\" : 77.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 3, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"113_U.S._73\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 70.0, " + + " \"delta\" : 70.0, " + + " \"variation\" : 70.0, " + + " \"deleted\" : 0.0 " + + " } " + + " }, { " + + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\", " + + " \"offset\" : 4, " + + " \"event\" : { " + + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " + + " \"robot\" : \"0\", " + + " \"namespace\" : \"article\", " + + " \"anonymous\" : \"0\", " + + " \"unpatrolled\" : \"0\", " + + " \"page\" : \"113_U.S._756\", " + + " \"language\" : \"en\", " + + " \"newpage\" : \"1\", " + + " \"user\" : \"MZMcBride\", " + + " \"count\" : 1.0, " + + " \"added\" : 68.0, " + + " \"delta\" : 68.0, " + + " \"variation\" : 68.0, " + + " \"deleted\" : 0.0 " + + " } " + + " } ] }} ]"; + // Select query results as records + private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("1"), new Text("article"), new Text("0"), new Text("0"), + new Text("11._korpus_(NOVJ)"), new Text("sl"), new Text("0"), new Text("EmausBot"), + new FloatWritable(1.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(39.0F), new FloatWritable(0.0F) } , + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"), + new Text("112_U.S._580"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } , + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._243"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(77.0F), new FloatWritable(0.0F) } , + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._73"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(70.0F), new FloatWritable(0.0F) } , + new Object[] { new TimestampWritable(new Timestamp(1356998400000L)), new Text("0"), new Text("article"), new Text("0"), new Text("0"), + new Text("113_U.S._756"), new Text("en"), new Text("1"), new Text("MZMcBride"), + new FloatWritable(1.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(68.0F), new FloatWritable(0.0F) } + }; + + + /** + * Test the default behavior of the objects and object inspectors. + * @throws IOException + * @throws IllegalAccessException + * @throws IllegalArgumentException + * @throws SecurityException + * @throws NoSuchFieldException + * @throws JsonMappingException + * @throws JsonParseException + * @throws InvocationTargetException + * @throws NoSuchMethodException + */ + @Test + public void testDruidSerDe() + throws SerDeException, JsonParseException, JsonMappingException, + NoSuchFieldException, SecurityException, IllegalArgumentException, + IllegalAccessException, IOException, InterruptedException, + NoSuchMethodException, InvocationTargetException { + // Create, initialize, and test the SerDe + DruidSerDe serDe = new DruidSerDe(); + Configuration conf = new Configuration(); + Properties tbl; + // Timeseries query + tbl = createPropertiesQuery("sample_datasource", Query.TIMESERIES, TIMESERIES_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.TIMESERIES, TIMESERIES_QUERY, + TIMESERIES_QUERY_RESULTS, TIMESERIES_QUERY_RESULTS_RECORDS); + // TopN query + tbl = createPropertiesQuery("sample_data", Query.TOPN, TOPN_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.TOPN, TOPN_QUERY, + TOPN_QUERY_RESULTS, TOPN_QUERY_RESULTS_RECORDS); + // GroupBy query + tbl = createPropertiesQuery("sample_datasource", Query.GROUP_BY, GROUP_BY_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.GROUP_BY, GROUP_BY_QUERY, + GROUP_BY_QUERY_RESULTS, GROUP_BY_QUERY_RESULTS_RECORDS); + // Select query + tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, + SELECT_QUERY_RESULTS, SELECT_QUERY_RESULTS_RECORDS); + } + + private static Properties createPropertiesQuery(String dataSource, String queryType, String jsonQuery) { + Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty(Constants.DRUID_DATA_SOURCE, dataSource); + tbl.setProperty(Constants.DRUID_QUERY_JSON, jsonQuery); + tbl.setProperty(Constants.DRUID_QUERY_TYPE, queryType); + return tbl; + } + + private static void deserializeQueryResults(DruidSerDe serDe, String queryType, String jsonQuery, + String resultString, Object[][] records) throws SerDeException, JsonParseException, + JsonMappingException, IOException, NoSuchFieldException, SecurityException, + IllegalArgumentException, IllegalAccessException, InterruptedException, + NoSuchMethodException, InvocationTargetException { + + // Initialize + Query<?> query = null; + DruidQueryRecordReader<?,?> reader = null; + List<?> resultsList = null; + ObjectMapper mapper = new DefaultObjectMapper(); + switch (queryType) { + case Query.TIMESERIES: + query = mapper.readValue(jsonQuery, TimeseriesQuery.class); + reader = new DruidTimeseriesQueryRecordReader(); + resultsList = mapper.readValue(resultString, + new TypeReference<List<Result<TimeseriesResultValue>>>() {}); + break; + case Query.TOPN: + query = mapper.readValue(jsonQuery, TopNQuery.class); + reader = new DruidTopNQueryRecordReader(); + resultsList = mapper.readValue(resultString, + new TypeReference<List<Result<TopNResultValue>>>() {}); + break; + case Query.GROUP_BY: + query = mapper.readValue(jsonQuery, GroupByQuery.class); + reader = new DruidGroupByQueryRecordReader(); + resultsList = mapper.readValue(resultString, + new TypeReference<List<Row>>() {}); + break; + case Query.SELECT: + query = mapper.readValue(jsonQuery, SelectQuery.class); + reader = new DruidSelectQueryRecordReader(); + resultsList = mapper.readValue(resultString, + new TypeReference<List<Result<SelectResultValue>>>() {}); + break; + } + + // Set query and fields access + Field field1 = DruidQueryRecordReader.class.getDeclaredField("query"); + field1.setAccessible(true); + field1.set(reader, query); + if (reader instanceof DruidGroupByQueryRecordReader) { + Method method1 = DruidGroupByQueryRecordReader.class.getDeclaredMethod("initExtractors"); + method1.setAccessible(true); + method1.invoke(reader); + } + Field field2 = DruidQueryRecordReader.class.getDeclaredField("results"); + field2.setAccessible(true); + + // Get the row structure + StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector(); + List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs(); + + // Check mapred + Iterator<?> results = resultsList.iterator(); + field2.set(reader, results); + DruidWritable writable = new DruidWritable(); + int pos = 0; + while (reader.next(NullWritable.get(), writable)) { + Object row = serDe.deserialize(writable); + Object[] expectedFieldsData = records[pos]; + assertEquals(expectedFieldsData.length, fieldRefs.size()); + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + assertEquals("Field " + i, expectedFieldsData[i], fieldData); + } + pos++; + } + assertEquals(pos, records.length); + + // Check mapreduce + results = resultsList.iterator(); + field2.set(reader, results); + pos = 0; + while (reader.nextKeyValue()) { + Object row = serDe.deserialize(reader.getCurrentValue()); + Object[] expectedFieldsData = records[pos]; + assertEquals(expectedFieldsData.length, fieldRefs.size()); + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + assertEquals("Field " + i, expectedFieldsData[i], fieldData); + } + pos++; + } + assertEquals(pos, records.length); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java new file mode 100644 index 0000000..b20168d --- /dev/null +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.joda.time.Interval; +import org.junit.Test; + +import junit.framework.TestCase; + +public class TestHiveDruidQueryBasedInputFormat extends TestCase { + + @SuppressWarnings("unchecked") + @Test + public void testCreateSplitsIntervals() throws Exception { + HiveDruidQueryBasedInputFormat input = new HiveDruidQueryBasedInputFormat(); + + Method method1 = HiveDruidQueryBasedInputFormat.class.getDeclaredMethod("createSplitsIntervals", + List.class, int.class); + method1.setAccessible(true); + + List<Interval> intervals; + List<List<Interval>> resultList; + List<List<Interval>> expectedResultList; + + // Test 1 : single split, create 4 + intervals = new ArrayList<>(); + intervals.add(new Interval(1262304000000L, 1293840000000L)); + resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4); + expectedResultList = new ArrayList<>(); + expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1270188000000L))); + expectedResultList.add(Arrays.asList(new Interval(1270188000000L, 1278072000000L))); + expectedResultList.add(Arrays.asList(new Interval(1278072000000L, 1285956000000L))); + expectedResultList.add(Arrays.asList(new Interval(1285956000000L, 1293840000000L))); + assertEquals(expectedResultList, resultList); + + // Test 2 : two splits, create 4 + intervals = new ArrayList<>(); + intervals.add(new Interval(1262304000000L, 1293840000000L)); + intervals.add(new Interval(1325376000000L, 1356998400000L)); + resultList = (List<List<Interval>>) method1.invoke(input, intervals, 4); + expectedResultList = new ArrayList<>(); + expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1278093600000L))); + expectedResultList.add(Arrays.asList(new Interval(1278093600000L, 1293840000000L), + new Interval(1325376000000L, 1325419200000L))); + expectedResultList.add(Arrays.asList(new Interval(1325419200000L, 1341208800000L))); + expectedResultList.add(Arrays.asList(new Interval(1341208800000L, 1356998400000L))); + assertEquals(expectedResultList, resultList); + + // Test 3 : two splits, create 5 + intervals = new ArrayList<>(); + intervals.add(new Interval(1262304000000L, 1293840000000L)); + intervals.add(new Interval(1325376000000L, 1356998400000L)); + resultList = (List<List<Interval>>) method1.invoke(input, intervals, 5); + expectedResultList = new ArrayList<>(); + expectedResultList.add(Arrays.asList(new Interval(1262304000000L, 1274935680000L))); + expectedResultList.add(Arrays.asList(new Interval(1274935680000L, 1287567360000L))); + expectedResultList.add(Arrays.asList(new Interval(1287567360000L, 1293840000000L), + new Interval(1325376000000L, 1331735040000L))); + expectedResultList.add(Arrays.asList(new Interval(1331735040000L, 1344366720000L))); + expectedResultList.add(Arrays.asList(new Interval(1344366720000L, 1356998400000L))); + assertEquals(expectedResultList, resultList); + + // Test 4 : three splits, different ranges, create 6 + intervals = new ArrayList<>(); + intervals.add(new Interval(1199145600000L, 1201824000000L)); // one month + intervals.add(new Interval(1325376000000L, 1356998400000L)); // one year + intervals.add(new Interval(1407283200000L, 1407888000000L)); // 7 days + resultList = (List<List<Interval>>) method1.invoke(input, intervals, 6); + expectedResultList = new ArrayList<>(); + expectedResultList.add(Arrays.asList(new Interval(1199145600000L, 1201824000000L), + new Interval(1325376000000L, 1328515200000L))); + expectedResultList.add(Arrays.asList(new Interval(1328515200000L, 1334332800000L))); + expectedResultList.add(Arrays.asList(new Interval(1334332800000L, 1340150400000L))); + expectedResultList.add(Arrays.asList(new Interval(1340150400000L, 1345968000000L))); + expectedResultList.add(Arrays.asList(new Interval(1345968000000L, 1351785600000L))); + expectedResultList.add(Arrays.asList(new Interval(1351785600000L, 1356998400000L), + new Interval(1407283200000L, 1407888000000L))); + assertEquals(expectedResultList, resultList); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/itests/qtest/pom.xml ---------------------------------------------------------------------- diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index 7fc72b9..e762d0e 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -110,6 +110,19 @@ <version>${hadoop.version}</version> <optional>true</optional> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-druid-handler</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-druid-handler</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> <!-- test inter-project --> <dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/packaging/pom.xml ---------------------------------------------------------------------- diff --git a/packaging/pom.xml b/packaging/pom.xml index 679dfe8..76e0cff 100644 --- a/packaging/pom.xml +++ b/packaging/pom.xml @@ -210,6 +210,11 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-druid-handler</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-hwi</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4c41200..2fb78cd 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ <module>cli</module> <module>common</module> <module>contrib</module> + <module>druid-handler</module> <module>hbase-handler</module> <module>hcatalog</module> <module>hplsql</module> @@ -130,6 +131,7 @@ <derby.version>10.10.2.0</derby.version> <dropwizard.version>3.1.0</dropwizard.version> <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version> + <druid.version>0.9.1.1</druid.version> <guava.version>14.0.1</guava.version> <groovy.version>2.4.4</groovy.version> <hadoop.version>2.7.2</hadoop.version> http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 5722305..66cbdd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -60,6 +60,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HdfsUtils; @@ -4468,12 +4469,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } public static boolean doesTableNeedLocation(Table tbl) { - // If we are ok with breaking compatibility of existing 3rd party StorageHandlers, + // TODO: If we are ok with breaking compatibility of existing 3rd party StorageHandlers, // this method could be moved to the HiveStorageHandler interface. boolean retval = true; if (tbl.getStorageHandler() != null) { - retval = !tbl.getStorageHandler().toString().equals( - "org.apache.hadoop.hive.hbase.HBaseStorageHandler"); + String sh = tbl.getStorageHandler().toString(); + retval = !sh.equals("org.apache.hadoop.hive.hbase.HBaseStorageHandler") + && !sh.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID); } return retval; } http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 60646ba..4710b8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -31,14 +31,20 @@ import java.util.TreeSet; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorDay; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorHour; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorMinute; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorMonth; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorQuarter; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorSecond; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorWeek; +import org.apache.hadoop.hive.ql.udf.UDFDateFloorYear; import org.apache.hadoop.hive.ql.udf.SettableUDF; import org.apache.hadoop.hive.ql.udf.UDAFPercentile; import org.apache.hadoop.hive.ql.udf.UDFAcos; @@ -141,6 +147,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.AnnotationUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * FunctionRegistry. @@ -289,6 +297,16 @@ public final class FunctionRegistry { system.registerGenericUDF("trunc", GenericUDFTrunc.class); system.registerGenericUDF("date_format", GenericUDFDateFormat.class); + // Special date formatting functions + system.registerUDF("floor_year", UDFDateFloorYear.class, false); + system.registerUDF("floor_quarter", UDFDateFloorQuarter.class, false); + system.registerUDF("floor_month", UDFDateFloorMonth.class, false); + system.registerUDF("floor_day", UDFDateFloorDay.class, false); + system.registerUDF("floor_week", UDFDateFloorWeek.class, false); + system.registerUDF("floor_hour", UDFDateFloorHour.class, false); + system.registerUDF("floor_minute", UDFDateFloorMinute.class, false); + system.registerUDF("floor_second", UDFDateFloorSecond.class, false); + system.registerGenericUDF("date_add", GenericUDFDateAdd.class); system.registerGenericUDF("date_sub", GenericUDFDateSub.class); system.registerGenericUDF("datediff", GenericUDFDateDiff.class); http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java index aeb4e7d..890aea1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HivePlannerContext.java @@ -19,21 +19,28 @@ package org.apache.hadoop.hive.ql.optimizer.calcite; import org.apache.calcite.plan.Context; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.HiveDruidConf; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry; public class HivePlannerContext implements Context { - private HiveAlgorithmsConf config; + private HiveAlgorithmsConf algoConfig; + private HiveDruidConf druidConf; private HiveRulesRegistry registry; - public HivePlannerContext(HiveAlgorithmsConf config, HiveRulesRegistry registry) { - this.config = config; + public HivePlannerContext(HiveAlgorithmsConf algoConfig, HiveDruidConf druidConf, + HiveRulesRegistry registry) { + this.algoConfig = algoConfig; + this.druidConf = druidConf; this.registry = registry; } public <T> T unwrap(Class<T> clazz) { - if (clazz.isInstance(config)) { - return clazz.cast(config); + if (clazz.isInstance(algoConfig)) { + return clazz.cast(algoConfig); + } + if (clazz.isInstance(druidConf)) { + return clazz.cast(druidConf); } if (clazz.isInstance(registry)) { return clazz.cast(registry); http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java new file mode 100644 index 0000000..82ab4d7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidIntervalUtils.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.druid; + +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.commons.lang.StringUtils; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +/** + * Utilities for generating intervals from RexNode. + * + * Based on Navis logic implemented on Hive data structures. + * See <a href="https://github.com/druid-io/druid/pull/2880">Druid PR-2880</a> + * + */ +@SuppressWarnings({"rawtypes","unchecked"}) +public class DruidIntervalUtils { + + protected static final Logger LOG = LoggerFactory.getLogger(DruidIntervalUtils.class); + + + /** + * Given a list of predicates, it generates the equivalent Interval + * (if possible). It assumes that all the predicates in the input + * reference a single column : the timestamp column. + * + * @param conjs list of conditions to use for the transformation + * @return interval representing the conditions in the input list + */ + public static List<Interval> createInterval(RelDataType type, List<RexNode> conjs) { + List<Range> ranges = new ArrayList<>(); + for (RexNode child : conjs) { + List<Range> extractedRanges = extractRanges(type, child, false); + if (extractedRanges == null || extractedRanges.isEmpty()) { + // We could not extract, we bail out + return null; + } + if (ranges.isEmpty()) { + ranges.addAll(extractedRanges); + continue; + } + List<Range> overlapped = Lists.newArrayList(); + for (Range current : ranges) { + for (Range interval : extractedRanges) { + if (current.isConnected(interval)) { + overlapped.add(current.intersection(interval)); + } + } + } + ranges = overlapped; + } + List<Range> compactRanges = condenseRanges(ranges); + LOG.debug("Inferred ranges on interval : " + compactRanges); + return toInterval(compactRanges); + } + + protected static List<Interval> toInterval(List<Range> ranges) { + List<Interval> intervals = Lists.transform(ranges, new Function<Range, Interval>() { + @Override + public Interval apply(Range range) { + if (!range.hasLowerBound() && !range.hasUpperBound()) { + return DruidTable.DEFAULT_INTERVAL; + } + long start = range.hasLowerBound() ? toLong(range.lowerEndpoint()) : + DruidTable.DEFAULT_INTERVAL.getStartMillis(); + long end = range.hasUpperBound() ? toLong(range.upperEndpoint()) : + DruidTable.DEFAULT_INTERVAL.getEndMillis(); + if (range.hasLowerBound() && range.lowerBoundType() == BoundType.OPEN) { + start++; + } + if (range.hasUpperBound() && range.upperBoundType() == BoundType.CLOSED) { + end++; + } + return new Interval(start, end); + } + }); + LOG.info("Converted time ranges " + ranges + " to interval " + intervals); + return intervals; + } + + protected static List<Range> extractRanges(RelDataType type, RexNode node, + boolean withNot) { + switch (node.getKind()) { + case EQUALS: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + case BETWEEN: + case IN: + return leafToRanges(type, (RexCall) node, withNot); + + case NOT: + return extractRanges(type, ((RexCall) node).getOperands().get(0), !withNot); + + case OR: + RexCall call = (RexCall) node; + List<Range> intervals = Lists.newArrayList(); + for (RexNode child : call.getOperands()) { + List<Range> extracted = extractRanges(type, child, withNot); + if (extracted != null) { + intervals.addAll(extracted); + } + } + return intervals; + + default: + return null; + } + } + + protected static List<Range> leafToRanges(RelDataType type, RexCall call, + boolean withNot) { + switch (call.getKind()) { + case EQUALS: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + { + RexLiteral literal = null; + if (call.getOperands().get(0) instanceof RexInputRef && + call.getOperands().get(1) instanceof RexLiteral) { + literal = extractLiteral(call.getOperands().get(1)); + } else if (call.getOperands().get(0) instanceof RexInputRef && + call.getOperands().get(1).getKind() == SqlKind.CAST) { + literal = extractLiteral(call.getOperands().get(1)); + } else if (call.getOperands().get(1) instanceof RexInputRef && + call.getOperands().get(0) instanceof RexLiteral) { + literal = extractLiteral(call.getOperands().get(0)); + } else if (call.getOperands().get(1) instanceof RexInputRef && + call.getOperands().get(0).getKind() == SqlKind.CAST) { + literal = extractLiteral(call.getOperands().get(0)); + } + if (literal == null) { + return null; + } + Comparable value = literalToType(literal, type); + if (value == null) { + return null; + } + if (call.getKind() == SqlKind.LESS_THAN) { + return Arrays.<Range> asList(withNot ? Range.atLeast(value) : Range.lessThan(value)); + } else if (call.getKind() == SqlKind.LESS_THAN_OR_EQUAL) { + return Arrays.<Range> asList(withNot ? Range.greaterThan(value) : Range.atMost(value)); + } else if (call.getKind() == SqlKind.GREATER_THAN) { + return Arrays.<Range> asList(withNot ? Range.atMost(value) : Range.greaterThan(value)); + } else if (call.getKind() == SqlKind.GREATER_THAN_OR_EQUAL) { + return Arrays.<Range> asList(withNot ? Range.lessThan(value) : Range.atLeast(value)); + } else { //EQUALS + if (!withNot) { + return Arrays.<Range> asList(Range.closed(value, value)); + } + return Arrays.<Range> asList(Range.lessThan(value), Range.greaterThan(value)); + } + } + case BETWEEN: + { + RexLiteral literal1 = extractLiteral(call.getOperands().get(2)); + if (literal1 == null) { + return null; + } + RexLiteral literal2 = extractLiteral(call.getOperands().get(3)); + if (literal2 == null) { + return null; + } + Comparable value1 = literalToType(literal1, type); + Comparable value2 = literalToType(literal2, type); + if (value1 == null || value2 == null) { + return null; + } + boolean inverted = value1.compareTo(value2) > 0; + if (!withNot) { + return Arrays.<Range> asList( + inverted ? Range.closed(value2, value1) : Range.closed(value1, value2)); + } + return Arrays.<Range> asList(Range.lessThan(inverted ? value2 : value1), + Range.greaterThan(inverted ? value1 : value2)); + } + case IN: + { + List<Range> ranges = Lists.newArrayList(); + for (int i = 1; i < call.getOperands().size(); i++) { + RexLiteral literal = extractLiteral(call.getOperands().get(i)); + if (literal == null) { + return null; + } + Comparable element = literalToType(literal, type); + if (element == null) { + return null; + } + if (withNot) { + ranges.addAll( + Arrays.<Range> asList(Range.lessThan(element), Range.greaterThan(element))); + } else { + ranges.add(Range.closed(element, element)); + } + } + return ranges; + } + default: + return null; + } + } + + @SuppressWarnings("incomplete-switch") + protected static Comparable literalToType(RexLiteral literal, RelDataType type) { + // Extract + Object value = null; + switch (literal.getType().getSqlTypeName()) { + case DATE: + case TIME: + case TIMESTAMP: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + value = literal.getValue(); + break; + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case DOUBLE: + case DECIMAL: + case FLOAT: + case REAL: + case VARCHAR: + case CHAR: + case BOOLEAN: + value = literal.getValue3(); + } + if (value == null) { + return null; + } + + // Convert + switch (type.getSqlTypeName()) { + case BIGINT: + return toLong(value); + case INTEGER: + return toInt(value); + case FLOAT: + return toFloat(value); + case DOUBLE: + return toDouble(value); + case VARCHAR: + case CHAR: + return String.valueOf(value); + case TIMESTAMP: + return toTimestamp(value); + } + return null; + } + + private static RexLiteral extractLiteral(RexNode node) { + RexNode target = node; + if (node.getKind() == SqlKind.CAST) { + target = ((RexCall)node).getOperands().get(0); + } + if (!(target instanceof RexLiteral)) { + return null; + } + return (RexLiteral) target; + } + + private static Comparable toTimestamp(Object literal) { + if (literal instanceof Timestamp) { + return (Timestamp) literal; + } + if (literal instanceof Date) { + return new Timestamp(((Date) literal).getTime()); + } + if (literal instanceof Number) { + return new Timestamp(((Number) literal).longValue()); + } + if (literal instanceof String) { + String string = (String) literal; + if (StringUtils.isNumeric(string)) { + return new Timestamp(Long.valueOf(string)); + } + try { + return Timestamp.valueOf(string); + } catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Long toLong(Object literal) { + if (literal instanceof Number) { + return ((Number) literal).longValue(); + } + if (literal instanceof Date) { + return ((Date) literal).getTime(); + } + if (literal instanceof Timestamp) { + return ((Timestamp) literal).getTime(); + } + if (literal instanceof String) { + try { + return Long.valueOf((String) literal); + } catch (NumberFormatException e) { + // ignore + } + try { + return DateFormat.getDateInstance().parse((String) literal).getTime(); + } catch (ParseException e) { + // best effort. ignore + } + } + return null; + } + + + private static Integer toInt(Object literal) { + if (literal instanceof Number) { + return ((Number) literal).intValue(); + } + if (literal instanceof String) { + try { + return Integer.valueOf((String) literal); + } catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Float toFloat(Object literal) { + if (literal instanceof Number) { + return ((Number) literal).floatValue(); + } + if (literal instanceof String) { + try { + return Float.valueOf((String) literal); + } catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Double toDouble(Object literal) { + if (literal instanceof Number) { + return ((Number) literal).doubleValue(); + } + if (literal instanceof String) { + try { + return Double.valueOf((String) literal); + } catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + protected static List<Range> condenseRanges(List<Range> ranges) { + if (ranges.size() <= 1) { + return ranges; + } + + Comparator<Range> startThenEnd = new Comparator<Range>() { + @Override + public int compare(Range lhs, Range rhs) { + int compare = 0; + if (lhs.hasLowerBound() && rhs.hasLowerBound()) { + compare = lhs.lowerEndpoint().compareTo(rhs.lowerEndpoint()); + } else if (!lhs.hasLowerBound() && rhs.hasLowerBound()) { + compare = -1; + } else if (lhs.hasLowerBound() && !rhs.hasLowerBound()) { + compare = 1; + } + if (compare != 0) { + return compare; + } + if (lhs.hasUpperBound() && rhs.hasUpperBound()) { + compare = lhs.upperEndpoint().compareTo(rhs.upperEndpoint()); + } else if (!lhs.hasUpperBound() && rhs.hasUpperBound()) { + compare = -1; + } else if (lhs.hasUpperBound() && !rhs.hasUpperBound()) { + compare = 1; + } + return compare; + } + }; + + TreeSet<Range> sortedIntervals = Sets.newTreeSet(startThenEnd); + sortedIntervals.addAll(ranges); + + List<Range> retVal = Lists.newArrayList(); + + Iterator<Range> intervalsIter = sortedIntervals.iterator(); + Range currInterval = intervalsIter.next(); + while (intervalsIter.hasNext()) { + Range next = intervalsIter.next(); + if (currInterval.encloses(next)) { + continue; + } + if (mergeable(currInterval, next)) { + currInterval = currInterval.span(next); + } else { + retVal.add(currInterval); + currInterval = next; + } + } + retVal.add(currInterval); + + return retVal; + } + + protected static boolean mergeable(Range range1, Range range2) { + Comparable x1 = range1.upperEndpoint(); + Comparable x2 = range2.lowerEndpoint(); + int compare = x1.compareTo(x2); + return compare > 0 || (compare == 0 && range1.upperBoundType() == BoundType.CLOSED + && range2.lowerBoundType() == BoundType.CLOSED); + } + + public static long extractTotalTime(List<Interval> intervals) { + long totalTime = 0; + for (Interval interval : intervals) { + totalTime += (interval.getEndMillis() - interval.getStartMillis()); + } + return totalTime; + } + +}
