[CALCITE-1281] Druid adapter wrongly returns all numeric values as int or float
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/ec49a0fa Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/ec49a0fa Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/ec49a0fa Branch: refs/heads/master Commit: ec49a0fa37195bb4b34945b53ce39b27d558d6ab Parents: 435e203 Author: Julian Hyde <[email protected]> Authored: Wed Jun 8 17:03:17 2016 -0700 Committer: Julian Hyde <[email protected]> Committed: Wed Jun 8 17:35:03 2016 -0700 ---------------------------------------------------------------------- .../adapter/druid/DruidConnectionImpl.java | 72 +++++++++++++++----- .../calcite/adapter/druid/DruidQuery.java | 28 +++++++- .../org/apache/calcite/test/DruidAdapterIT.java | 33 +++++++-- 3 files changed, 112 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java index dccda9f..7520d70 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java @@ -22,6 +22,7 @@ import org.apache.calcite.interpreter.Sink; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.tree.Primitive; import org.apache.calcite.prepare.CalcitePrepareImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Holder; @@ -40,6 +41,7 @@ import com.google.common.collect.ImmutableSet; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -58,13 +60,24 @@ class DruidConnectionImpl implements DruidConnection { private final String url; private final String coordinatorUrl; - public DruidConnectionImpl(String url, String coordinatorUrl) { + DruidConnectionImpl(String url, String coordinatorUrl) { this.url = Preconditions.checkNotNull(url); this.coordinatorUrl = Preconditions.checkNotNull(coordinatorUrl); } + /** Executes a query request. + * + * @param queryType Query type + * @param data Data to post + * @param sink Sink to which to send the parsed rows + * @param fieldNames Names of fields + * @param fieldTypes Types of fields (never null, but elements may be null) + * @param page Page definition (in/out) + * @throws IOException on error + */ public void request(QueryType queryType, String data, Sink sink, - List<String> fieldNames, Page page) throws IOException { + List<String> fieldNames, List<Primitive> fieldTypes, Page page) + throws IOException { final String url = this.url + "/druid/v2/?pretty"; final Map<String, String> requestHeaders = ImmutableMap.of("Content-Type", "application/json"); @@ -73,14 +86,14 @@ class DruidConnectionImpl implements DruidConnection { } try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000); InputStream in = traceResponse(in0)) { - parse(queryType, in, sink, fieldNames, page); + parse(queryType, in, sink, fieldNames, fieldTypes, page); } } /** Parses the output of a {@code topN} query, sending the results to a * {@link Sink}. */ private void parse(QueryType queryType, InputStream in, Sink sink, - List<String> fieldNames, Page page) { + List<String> fieldNames, List<Primitive> fieldTypes, Page page) { final JsonFactory factory = new JsonFactory(); final Row.RowBuilder rowBuilder = Row.newBuilder(fieldNames.size()); @@ -105,7 +118,7 @@ class DruidConnectionImpl implements DruidConnection { && parser.nextToken() == JsonToken.START_ARRAY) { while (parser.nextToken() == JsonToken.START_OBJECT) { // loop until token equal to "}" - parseFields(fieldNames, rowBuilder, parser); + parseFields(fieldNames, fieldTypes, rowBuilder, parser); sink.send(rowBuilder.build()); rowBuilder.reset(); } @@ -145,7 +158,7 @@ class DruidConnectionImpl implements DruidConnection { if (parser.nextToken() == JsonToken.FIELD_NAME && parser.getCurrentName().equals("event") && parser.nextToken() == JsonToken.START_OBJECT) { - parseFields(fieldNames, rowBuilder, parser); + parseFields(fieldNames, fieldTypes, rowBuilder, parser); sink.send(rowBuilder.build()); rowBuilder.reset(); } @@ -165,7 +178,7 @@ class DruidConnectionImpl implements DruidConnection { if (parser.nextToken() == JsonToken.FIELD_NAME && parser.getCurrentName().equals("event") && parser.nextToken() == JsonToken.START_OBJECT) { - parseFields(fieldNames, rowBuilder, parser); + parseFields(fieldNames, fieldTypes, rowBuilder, parser); sink.send(rowBuilder.build()); rowBuilder.reset(); } @@ -178,15 +191,15 @@ class DruidConnectionImpl implements DruidConnection { } } - private void parseFields(List<String> fieldNames, Row.RowBuilder rowBuilder, - JsonParser parser) throws IOException { + private void parseFields(List<String> fieldNames, List<Primitive> fieldTypes, + Row.RowBuilder rowBuilder, JsonParser parser) throws IOException { while (parser.nextToken() == JsonToken.FIELD_NAME) { - parseField(fieldNames, rowBuilder, parser); + parseField(fieldNames, fieldTypes, rowBuilder, parser); } } - private void parseField(List<String> fieldNames, Row.RowBuilder rowBuilder, - JsonParser parser) throws IOException { + private void parseField(List<String> fieldNames, List<Primitive> fieldTypes, + Row.RowBuilder rowBuilder, JsonParser parser) throws IOException { final String fieldName = parser.getCurrentName(); // Move to next token, which is name's value @@ -197,10 +210,35 @@ class DruidConnectionImpl implements DruidConnection { } switch (token) { case VALUE_NUMBER_INT: - rowBuilder.set(i, parser.getIntValue()); - break; case VALUE_NUMBER_FLOAT: - rowBuilder.set(i, parser.getDoubleValue()); + Primitive type = fieldTypes.get(i); + if (type == null) { + if (token == JsonToken.VALUE_NUMBER_INT) { + type = Primitive.INT; + } else { + type = Primitive.FLOAT; + } + } + switch (type) { + case BYTE: + rowBuilder.set(i, parser.getIntValue()); + break; + case SHORT: + rowBuilder.set(i, parser.getShortValue()); + break; + case INT: + rowBuilder.set(i, parser.getIntValue()); + break; + case LONG: + rowBuilder.set(i, parser.getLongValue()); + break; + case FLOAT: + rowBuilder.set(i, parser.getFloatValue()); + break; + case DOUBLE: + rowBuilder.set(i, parser.getDoubleValue()); + break; + } break; case VALUE_TRUE: rowBuilder.set(i, true); @@ -287,7 +325,9 @@ class DruidConnectionImpl implements DruidConnection { public void run() { try { final Page page = new Page(); - request(queryType, request, this, fieldNames, page); + final List<Primitive> fieldTypes = + Collections.nCopies(fieldNames.size(), null); + request(queryType, request, this, fieldNames, fieldTypes, page); enumerator.done.set(true); } catch (Throwable e) { enumerator.throwableHolder.set(e); http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java index 9a858f9..a3fdf78 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java @@ -24,6 +24,7 @@ import org.apache.calcite.interpreter.Interpreter; import org.apache.calcite.interpreter.Node; import org.apache.calcite.interpreter.Sink; import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.tree.Primitive; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; @@ -41,6 +42,7 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -674,6 +676,10 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { } public void run() throws InterruptedException { + final List<Primitive> fieldTypes = new ArrayList<>(); + for (RelDataTypeField field : query.getRowType().getFieldList()) { + fieldTypes.add(getPrimitive(field)); + } try { final DruidConnectionImpl connection = new DruidConnectionImpl(query.druidTable.schema.url, @@ -685,12 +691,32 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { final String queryString = querySpec.getQueryString(page.pagingIdentifier, page.offset); connection.request(querySpec.queryType, queryString, sink, - querySpec.fieldNames, page); + querySpec.fieldNames, fieldTypes, page); } while (page.pagingIdentifier != null && page.offset > previousOffset); } catch (IOException e) { throw Throwables.propagate(e); } } + + private Primitive getPrimitive(RelDataTypeField field) { + switch (field.getType().getSqlTypeName()) { + case BIGINT: + return Primitive.LONG; + case INTEGER: + return Primitive.INT; + case SMALLINT: + return Primitive.SHORT; + case TINYINT: + return Primitive.BYTE; + case REAL: + return Primitive.FLOAT; + case DOUBLE: + case FLOAT: + return Primitive.DOUBLE; + default: + return null; + } + } } /** Object that knows how to write itself to a http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java ---------------------------------------------------------------------- diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java index d59ef24..fee3e3d 100644 --- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java +++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java @@ -200,6 +200,29 @@ public class DruidAdapterIT { .explainContains(explain); } + /** Test case for + * <a href="https://issues.apache.org/jira/browse/CALCITE-1281">[CALCITE-1281] + * Druid adapter wrongly returns all numeric values as int or float</a>. */ + @Test public void testSelectCount() { + final String sql = "select count(*) as c from \"foodmart\""; + sql(sql) + .returns(new Function<ResultSet, Void>() { + public Void apply(ResultSet input) { + try { + assertThat(input.next(), is(true)); + assertThat(input.getInt(1), is(86829)); + assertThat(input.getLong(1), is(86829L)); + assertThat(input.getString(1), is("86829")); + assertThat(input.wasNull(), is(false)); + assertThat(input.next(), is(false)); + return null; + } catch (SQLException e) { + throw Throwables.propagate(e); + } + } + }); + } + @Test public void testSort() { // Note: We do not push down SORT yet final String explain = "PLAN=" @@ -455,7 +478,6 @@ public class DruidAdapterIT { "C=21610; state_province=OR"); } - @Ignore("TODO: fix invalid cast from Integer to Long") @Test public void testGroupByAvgSumCount() { final String sql = "select \"state_province\",\n" + " avg(\"unit_sales\") as a,\n" @@ -467,11 +489,14 @@ public class DruidAdapterIT { + "order by 1"; String druidQuery = "'aggregations':[" + "{'type':'longSum','name':'$f1','fieldName':'unit_sales'}," - + "{'type':'count','name':'$f2'}]"; + + "{'type':'count','name':'$f2','fieldName':'unit_sales'}," + + "{'type':'count','name':'C','fieldName':'store_sqft'}," + + "{'type':'count','name':'C0'}]," + + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}"; sql(sql) .limit(2) - .returnsUnordered("state_province=CA; A=3; S=74748; C=23190; C0=23190", - "state_province=OR; A=3; S=67659; C=19027; C0=19027") + .returnsUnordered("state_province=CA; A=3; S=74748; C=24441; C0=24441", + "state_province=OR; A=3; S=67659; C=21610; C0=21610") .queryContains(druidChecker(druidQuery)); }
