[CALCITE-1276] In Druid adapter, deduce tables and columns if not specified
Deduce tables by calling /druid/coordinator/v1/metadata/datasources; columns by running a "segmentMetadata" query. Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/435e2030 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/435e2030 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/435e2030 Branch: refs/heads/master Commit: 435e203028a872933c6d8bd6404ebc445552e599 Parents: 23c8e45 Author: Julian Hyde <[email protected]> Authored: Mon Jun 6 23:03:08 2016 -0700 Committer: Julian Hyde <[email protected]> Committed: Wed Jun 8 17:35:03 2016 -0700 ---------------------------------------------------------------------- .../org/apache/calcite/model/ModelHandler.java | 19 ++- .../org/apache/calcite/runtime/HttpUtils.java | 3 +- druid/pom.xml | 4 + .../adapter/druid/DruidConnectionImpl.java | 153 ++++++++++++++++++- .../calcite/adapter/druid/DruidQuery.java | 29 +++- .../calcite/adapter/druid/DruidSchema.java | 40 ++++- .../adapter/druid/DruidSchemaFactory.java | 41 ++++- .../calcite/adapter/druid/DruidTable.java | 65 +++++++- .../adapter/druid/DruidTableFactory.java | 61 ++++---- .../org/apache/calcite/test/DruidAdapterIT.java | 106 ++++++------- .../test/resources/druid-foodmart-model.json | 3 +- druid/src/test/resources/druid-wiki-model.json | 3 +- .../resources/druid-wiki-no-columns-model.json | 45 ++++++ .../resources/druid-wiki-no-tables-model.json | 33 ++++ site/_docs/druid_adapter.md | 3 +- 15 files changed, 490 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/core/src/main/java/org/apache/calcite/model/ModelHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/model/ModelHandler.java b/core/src/main/java/org/apache/calcite/model/ModelHandler.java index 8b451ee..8c7f5fa 100644 --- a/core/src/main/java/org/apache/calcite/model/ModelHandler.java +++ b/core/src/main/java/org/apache/calcite/model/ModelHandler.java @@ -212,7 +212,7 @@ public class ModelHandler { jsonSchema.factory); final Schema schema = schemaFactory.create( - parentSchema, jsonSchema.name, operandMap(jsonSchema.operand)); + parentSchema, jsonSchema.name, operandMap(jsonSchema, jsonSchema.operand)); final SchemaPlus schemaPlus = parentSchema.add(jsonSchema.name, schema); populateSchema(jsonSchema, schemaPlus); } catch (Exception e) { @@ -221,7 +221,8 @@ public class ModelHandler { } /** Adds extra entries to an operand to a custom schema. */ - protected Map<String, Object> operandMap(Map<String, Object> operand) { + protected Map<String, Object> operandMap(JsonSchema jsonSchema, + Map<String, Object> operand) { if (operand == null) { return ImmutableMap.of(); } @@ -238,6 +239,13 @@ public class ModelHandler { final File file = new File(modelUri); builder.put(extraOperand.camelName, file.getParentFile()); } + break; + case TABLES: + if (jsonSchema instanceof JsonCustomSchema) { + builder.put(extraOperand.camelName, + ((JsonCustomSchema) jsonSchema).tables); + } + break; } } } @@ -338,7 +346,7 @@ public class ModelHandler { jsonTable.factory); final Table table = tableFactory.create(schema, jsonTable.name, - operandMap(jsonTable.operand), null); + operandMap(null, jsonTable.operand), null); schema.add(jsonTable.name, table); } catch (Exception e) { throw new RuntimeException("Error instantiating " + jsonTable, e); @@ -428,7 +436,10 @@ public class ModelHandler { MODEL_URI("modelUri"), /** Base directory from which to read files. */ - BASE_DIRECTORY("baseDirectory"); + BASE_DIRECTORY("baseDirectory"), + + /** Tables defined in this schema. */ + TABLES("tables"); public final String camelName; http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java b/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java index 011092c..4519281 100644 --- a/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java +++ b/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java @@ -126,7 +126,8 @@ public class HttpUtils { Map<String, String> headers, int cTimeout, int rTimeout) throws IOException { - return executeMethod("POST", url, data, headers, cTimeout, rTimeout); + return executeMethod(data == null ? "GET" : "POST", url, data, headers, + cTimeout, rTimeout); } public static InputStream executeMethod( http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/pom.xml ---------------------------------------------------------------------- diff --git a/druid/pom.xml b/druid/pom.xml index 22f0c1d..7470774 100644 --- a/druid/pom.xml +++ b/druid/pom.xml @@ -57,6 +57,10 @@ limitations under the License. </dependency> <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java index dd462b6..dccda9f 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java @@ -23,14 +23,19 @@ import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.prepare.CalcitePrepareImpl; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Holder; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -38,6 +43,7 @@ import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -49,21 +55,26 @@ import static org.apache.calcite.runtime.HttpUtils.post; * Implementation of {@link DruidConnection}. */ class DruidConnectionImpl implements DruidConnection { - final String url; + private final String url; + private final String coordinatorUrl; - public DruidConnectionImpl(String url) { - this.url = url; + public DruidConnectionImpl(String url, String coordinatorUrl) { + this.url = Preconditions.checkNotNull(url); + this.coordinatorUrl = Preconditions.checkNotNull(coordinatorUrl); } public void request(QueryType queryType, String data, Sink sink, List<String> fieldNames, Page page) throws IOException { + final String url = this.url + "/druid/v2/?pretty"; + final Map<String, String> requestHeaders = + ImmutableMap.of("Content-Type", "application/json"); if (CalcitePrepareImpl.DEBUG) { System.out.println(data); } - final Map<String, String> requestHeaders = - ImmutableMap.of("Content-Type", "application/json"); - final InputStream in = post(url, data, requestHeaders, 10000, 1800000); - parse(queryType, in, sink, fieldNames, page); + try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000); + InputStream in = traceResponse(in0)) { + parse(queryType, in, sink, fieldNames, page); + } } /** Parses the output of a {@code topN} query, sending the results to a @@ -290,6 +301,77 @@ class DruidConnectionImpl implements DruidConnection { }; } + /** Reads segment metadata, and populates a list of columns and metrics. */ + void metadata(String dataSourceName, List<String> intervals, + Map<String, SqlTypeName> fieldBuilder, Set<String> metricNameBuilder) { + final String url = this.url + "/druid/v2/?pretty"; + final Map<String, String> requestHeaders = + ImmutableMap.of("Content-Type", "application/json"); + final String data = DruidQuery.metadataQuery(dataSourceName, intervals); + if (CalcitePrepareImpl.DEBUG) { + System.out.println("Druid: " + data); + } + try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000); + InputStream in = traceResponse(in0)) { + final ObjectMapper mapper = new ObjectMapper(); + final CollectionType listType = + mapper.getTypeFactory().constructCollectionType(List.class, + JsonSegmentMetadata.class); + final List<JsonSegmentMetadata> list = mapper.readValue(in, listType); + in.close(); + for (JsonSegmentMetadata o : list) { + for (Map.Entry<String, JsonColumn> entry : o.columns.entrySet()) { + fieldBuilder.put(entry.getKey(), entry.getValue().sqlType()); + } + if (o.aggregators != null) { + for (Map.Entry<String, JsonAggregator> entry + : o.aggregators.entrySet()) { + fieldBuilder.put(entry.getKey(), entry.getValue().sqlType()); + metricNameBuilder.add(entry.getKey()); + } + } + } + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + /** Reads data source names from Druid. */ + Set<String> tableNames() { + final Map<String, String> requestHeaders = + ImmutableMap.of("Content-Type", "application/json"); + final String data = null; + final String url = coordinatorUrl + "/druid/coordinator/v1/metadata/datasources"; + if (CalcitePrepareImpl.DEBUG) { + System.out.println("Druid: table names" + data + "; " + url); + } + try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000); + InputStream in = traceResponse(in0)) { + final ObjectMapper mapper = new ObjectMapper(); + final CollectionType listType = + mapper.getTypeFactory().constructCollectionType(List.class, + String.class); + final List<String> list = mapper.readValue(in, listType); + return ImmutableSet.copyOf(list); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + private InputStream traceResponse(InputStream in) { + if (CalcitePrepareImpl.DEBUG) { + try { + final byte[] bytes = AvaticaUtils.readFullyToBytes(in); + in.close(); + System.out.println("Response: " + new String(bytes)); + in = new ByteArrayInputStream(bytes); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + return in; + } + /** A {@link Sink} that is also {@link Runnable}. */ private interface RunnableQueueSink extends Sink, Runnable { } @@ -343,6 +425,63 @@ class DruidConnectionImpl implements DruidConnection { return "{" + pagingIdentifier + ": " + offset + "}"; } } + + + /** Result of a "segmentMetadata" call, populated by Jackson. */ + @SuppressWarnings({ "WeakerAccess", "unused" }) + private static class JsonSegmentMetadata { + public String id; + public List<String> intervals; + public Map<String, JsonColumn> columns; + public int size; + public int numRows; + public Map<String, JsonAggregator> aggregators; + } + + /** Element of the "columns" collection in the result of a + * "segmentMetadata" call, populated by Jackson. */ + @SuppressWarnings({ "WeakerAccess", "unused" }) + private static class JsonColumn { + public String type; + public boolean hasMultipleValues; + public int size; + public Integer cardinality; + public String errorMessage; + + SqlTypeName sqlType() { + return sqlType(type); + } + + static SqlTypeName sqlType(String type) { + switch (type) { + case "LONG": + return SqlTypeName.BIGINT; + case "DOUBLE": + return SqlTypeName.DOUBLE; + case "FLOAT": + return SqlTypeName.REAL; + case "STRING": + return SqlTypeName.VARCHAR; + case "hyperUnique": + return SqlTypeName.VARBINARY; + default: + throw new AssertionError("unknown type " + type); + } + } + } + + /** Element of the "aggregators" collection in the result of a + * "segmentMetadata" call, populated by Jackson. */ + @SuppressWarnings({ "WeakerAccess", "unused" }) + private static class JsonAggregator { + public String type; + public String name; + public String fieldName; + + SqlTypeName sqlType() { + return JsonColumn.sqlType(type); + } + } } // End DruidConnectionImpl.java http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java index 0a9dbec..9a858f9 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java @@ -376,8 +376,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { writeFieldIf(generator, "filter", jsonFilter); writeField(generator, "aggregations", aggregations); writeFieldIf(generator, "postAggregations", null); - writeField(generator, "intervals", - ImmutableList.of(druidTable.interval)); + writeField(generator, "intervals", druidTable.intervals); writeFieldIf(generator, "having", null); generator.writeEndObject(); @@ -389,8 +388,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { generator.writeStringField("queryType", "select"); generator.writeStringField("dataSource", druidTable.dataSource); generator.writeStringField("descending", "false"); - writeField(generator, "intervals", - ImmutableList.of(druidTable.interval)); + writeField(generator, "intervals", druidTable.intervals); writeFieldIf(generator, "filter", jsonFilter); writeField(generator, "dimensions", translator.dimensions); writeField(generator, "metrics", translator.metrics); @@ -445,7 +443,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { } } - private static void writeField(JsonGenerator generator, String fieldName, + static void writeField(JsonGenerator generator, String fieldName, Object o) throws IOException { generator.writeFieldName(fieldName); writeObject(generator, o); @@ -521,6 +519,24 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { return Pair.of(aboveNodes, belowNodes); } + /** Generates a JSON string to query metadata about a data source. */ + static String metadataQuery(String dataSourceName, List<String> intervals) { + final StringWriter sw = new StringWriter(); + final JsonFactory factory = new JsonFactory(); + try { + final JsonGenerator generator = factory.createGenerator(sw); + generator.writeStartObject(); + generator.writeStringField("queryType", "segmentMetadata"); + generator.writeStringField("dataSource", dataSourceName); + writeFieldIf(generator, "intervals", intervals); + generator.writeEndObject(); + generator.close(); + } catch (IOException e) { + throw Throwables.propagate(e); + } + return sw.toString(); + } + /** Druid query specification. */ public static class QuerySpec { final QueryType queryType; @@ -660,7 +676,8 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { public void run() throws InterruptedException { try { final DruidConnectionImpl connection = - new DruidConnectionImpl(query.druidTable.schema.url); + new DruidConnectionImpl(query.druidTable.schema.url, + query.druidTable.schema.coordinatorUrl); final DruidConnectionImpl.Page page = new DruidConnectionImpl.Page(); int previousOffset; do { http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java index bd3539d..28e7086 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java @@ -18,30 +18,62 @@ package org.apache.calcite.adapter.druid; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.sql.type.SqlTypeName; import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; /** * Schema mapped onto a Druid instance. */ public class DruidSchema extends AbstractSchema { final String url; + final String coordinatorUrl; + private final boolean discoverTables; /** * Creates a Druid schema. * - * @param url URL of query REST service + * @param url URL of query REST service, e.g. "http://localhost:8082" + * @param coordinatorUrl URL of coordinator REST service, + * e.g. "http://localhost:8081" + * @param discoverTables If true, ask Druid what tables exist; + * if false, only create tables explicitly in the model */ - public DruidSchema(String url) { + public DruidSchema(String url, String coordinatorUrl, + boolean discoverTables) { this.url = Preconditions.checkNotNull(url); + this.coordinatorUrl = Preconditions.checkNotNull(coordinatorUrl); + this.discoverTables = discoverTables; } @Override protected Map<String, Table> getTableMap() { - final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); - return builder.build(); + if (!discoverTables) { + return ImmutableMap.of(); + } + final DruidConnectionImpl connection = + new DruidConnectionImpl(url, coordinatorUrl); + return Maps.asMap(ImmutableSet.copyOf(connection.tableNames()), + CacheBuilder.<String, Table>newBuilder() + .build(new CacheLoader<String, Table>() { + public Table load(@Nonnull String tableName) throws Exception { + final Map<String, SqlTypeName> fieldMap = new LinkedHashMap<>(); + final Set<String> metricNameSet = new LinkedHashSet<>(); + connection.metadata(tableName, null, fieldMap, metricNameSet); + return DruidTable.create(DruidSchema.this, tableName, null, + fieldMap, metricNameSet, null, connection); + } + })); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java index f00b932..d35d0bf 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java @@ -20,17 +20,52 @@ import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaFactory; import org.apache.calcite.schema.SchemaPlus; +import java.util.List; import java.util.Map; /** * Schema factory that creates Druid schemas. + * + * <table> + * <caption>Druid schema operands</caption> + * <tr> + * <th>Operand</th> + * <th>Description</th> + * <th>Required</th> + * </tr> + * <tr> + * <td>url</td> + * <td>URL of Druid's query node. + * The default is "http://localhost:8082".</td> + * <td>No</td> + * </tr> + * <tr> + * <td>coordinatorUrl</td> + * <td>URL of Druid's coordinator node. + * The default is <code>url</code>, replacing "8082" with "8081", + * for example "http://localhost:8081".</td> + * <td>No</td> + * </tr> + * </table> */ public class DruidSchemaFactory implements SchemaFactory { + /** Default Druid URL. */ + public static final String DEFAULT_URL = "http://localhost:8082"; + public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { - Map map = (Map) operand; - String url = (String) map.get("url"); - return new DruidSchema(url); + final Map map = (Map) operand; + final String url = map.get("url") instanceof String + ? (String) map.get("url") + : DEFAULT_URL; + final String coordinatorUrl = map.get("coordinatorUrl") instanceof String + ? (String) map.get("coordinatorUrl") + : url.replace(":8082", ":8081"); + // "tables" is a hidden attribute, copied in from the enclosing custom + // schema + final boolean containsTables = map.get("tables") instanceof List + && ((List) map.get("tables")).size() > 0; + return new DruidSchema(url, coordinatorUrl, !containsTables); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java index 78dcc0d..48258c1 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java @@ -25,25 +25,33 @@ import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Table; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Util; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import java.util.List; +import java.util.Map; import java.util.Set; /** * Table mapped onto a Druid table. */ public class DruidTable extends AbstractTable implements TranslatableTable { + protected static final String DEFAULT_INTERVAL = + "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z"; final DruidSchema schema; final String dataSource; final RelProtoDataType protoRowType; final ImmutableSet<String> metricFieldNames; - final String interval; + final List<String> intervals; final String timestampFieldName; /** @@ -53,18 +61,50 @@ public class DruidTable extends AbstractTable implements TranslatableTable { * @param dataSource Druid data source name * @param protoRowType Field names and types * @param metricFieldNames Names of fields that are metrics - * @param interval Default interval if query does not constrain the time + * @param intervals Default interval if query does not constrain the time, or null * @param timestampFieldName Name of the column that contains the time */ public DruidTable(DruidSchema schema, String dataSource, - RelProtoDataType protoRowType, Set<String> metricFieldNames, String interval, + RelProtoDataType protoRowType, Set<String> metricFieldNames, List<String> intervals, String timestampFieldName) { this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName); this.schema = Preconditions.checkNotNull(schema); this.dataSource = Preconditions.checkNotNull(dataSource); this.protoRowType = protoRowType; this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames); - this.interval = Preconditions.checkNotNull(interval); + this.intervals = Preconditions.checkNotNull( + Util.first(intervals, ImmutableList.of(DEFAULT_INTERVAL))); + } + + /** Creates a {@link DruidTable} + * + * @param druidSchema Druid schema + * @param dataSourceName Data source name in Druid, also table name + * @param intervals Intervals, or null to use default + * @param fieldMap Mutable map of fields (dimensions plus metrics); + * may be partially populated already + * @param metricNameSet Mutable set of metric names; + * may be partially populated already + * @param timestampColumnName Name of timestamp column, or null + * @param connection If not null, use this connection to find column + * definitions + * @return A table + */ + static Table create(DruidSchema druidSchema, String dataSourceName, + List<String> intervals, Map<String, SqlTypeName> fieldMap, + Set<String> metricNameSet, String timestampColumnName, + DruidConnectionImpl connection) { + if (connection != null) { + connection.metadata(dataSourceName, intervals, fieldMap, metricNameSet); + } + final ImmutableMap<String, SqlTypeName> fields = + ImmutableMap.copyOf(fieldMap); + if (timestampColumnName == null) { + timestampColumnName = Iterables.get(fieldMap.keySet(), 0); + } + return new DruidTable(druidSchema, dataSourceName, + new MapRelProtoDataType(fields), ImmutableSet.copyOf(metricNameSet), + intervals, Util.first(timestampColumnName, "__time")); } public RelDataType getRowType(RelDataTypeFactory typeFactory) { @@ -84,6 +124,23 @@ public class DruidTable extends AbstractTable implements TranslatableTable { ImmutableList.<RelNode>of(scan)); } + /** Creates a {@link RelDataType} from a map of + * field names and types. */ + private static class MapRelProtoDataType implements RelProtoDataType { + private final ImmutableMap<String, SqlTypeName> fields; + + MapRelProtoDataType(ImmutableMap<String, SqlTypeName> fields) { + this.fields = fields; + } + + public RelDataType apply(RelDataTypeFactory typeFactory) { + final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); + for (Map.Entry<String, SqlTypeName> field : fields.entrySet()) { + builder.add(field.getKey(), field.getValue()).nullable(true); + } + return builder.build(); + } + } } // End DruidTable.java http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java index 756e558..2453fcb 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java @@ -17,17 +17,15 @@ package org.apache.calcite.adapter.druid; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.TableFactory; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Util; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableList; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -39,6 +37,11 @@ import java.util.Set; * <p>A table corresponds to what Druid calls a "data source". */ public class DruidTableFactory implements TableFactory { + @SuppressWarnings("unused") + public static final DruidTableFactory INSTANCE = new DruidTableFactory(); + + private DruidTableFactory() {} + public Table create(SchemaPlus schema, String name, Map operand, RelDataType rowType) { final DruidSchema druidSchema = schema.unwrap(DruidSchema.class); @@ -46,17 +49,18 @@ public class DruidTableFactory implements TableFactory { final String dataSource = (String) operand.get("dataSource"); final Set<String> metricNameBuilder = new LinkedHashSet<>(); String timestampColumnName = (String) operand.get("timestampColumn"); - final ImmutableMap.Builder<String, SqlTypeName> fieldBuilder = - ImmutableMap.builder(); - if (operand.get("dimensions") != null) { + final Map<String, SqlTypeName> fieldBuilder = new LinkedHashMap<>(); + final Object dimensionsRaw = operand.get("dimensions"); + if (dimensionsRaw instanceof List) { //noinspection unchecked - final List<String> dimensions = (List<String>) operand.get("dimensions"); + final List<String> dimensions = (List<String>) dimensionsRaw; for (String dimension : dimensions) { fieldBuilder.put(dimension, SqlTypeName.VARCHAR); } } - if (operand.get("metrics") != null) { - final List metrics = (List) operand.get("metrics"); + final Object metricsRaw = operand.get("metrics"); + if (metricsRaw instanceof List) { + final List metrics = (List) metricsRaw; for (Object metric : metrics) { final SqlTypeName sqlTypeName; final String metricName; @@ -83,32 +87,23 @@ public class DruidTableFactory implements TableFactory { metricNameBuilder.add(metricName); } } - fieldBuilder.put(timestampColumnName, SqlTypeName.VARCHAR); - final ImmutableMap<String, SqlTypeName> fields = fieldBuilder.build(); - String interval = Util.first((String) operand.get("interval"), - "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z"); - return new DruidTable(druidSchema, Util.first(dataSource, name), - new MapRelProtoDataType(fields), - ImmutableSet.copyOf(metricNameBuilder), interval, timestampColumnName); - } - - /** Creates a {@link org.apache.calcite.rel.type.RelDataType} from a map of - * field names and types. */ - private static class MapRelProtoDataType implements RelProtoDataType { - private final ImmutableMap<String, SqlTypeName> fields; - - public MapRelProtoDataType(ImmutableMap<String, SqlTypeName> fields) { - this.fields = fields; + if (timestampColumnName != null) { + fieldBuilder.put(timestampColumnName, SqlTypeName.VARCHAR); } - - public RelDataType apply(RelDataTypeFactory typeFactory) { - final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); - for (Map.Entry<String, SqlTypeName> field : fields.entrySet()) { - builder.add(field.getKey(), field.getValue()).nullable(true); - } - return builder.build(); + final String dataSourceName = Util.first(dataSource, name); + DruidConnectionImpl c; + if (dimensionsRaw == null || metricsRaw == null) { + c = new DruidConnectionImpl(druidSchema.url, druidSchema.url.replace(":8082", ":8081")); + } else { + c = null; } + final Object interval = operand.get("interval"); + final List<String> intervals = interval instanceof String + ? ImmutableList.of((String) interval) : null; + return DruidTable.create(druidSchema, dataSourceName, intervals, + fieldBuilder, metricNameBuilder, timestampColumnName, c); } + } // End DruidTableFactory.java http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java ---------------------------------------------------------------------- diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java index 34717b6..d59ef24 100644 --- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java +++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java @@ -22,18 +22,16 @@ import org.apache.calcite.util.Util; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import org.junit.Ignore; import org.junit.Test; +import java.net.URL; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collections; import java.util.List; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -60,18 +58,24 @@ import static org.junit.Assert.assertTrue; * </ul> */ public class DruidAdapterIT { - /** Connection factory based on the "druid-foodmart" model. */ - public static final ImmutableMap<String, String> FOODMART = - ImmutableMap.of("model", - DruidAdapterIT.class.getResource("/druid-foodmart-model.json") - .getPath()); + /** URL of the "druid-foodmart" model. */ + public static final URL FOODMART = + DruidAdapterIT.class.getResource("/druid-foodmart-model.json"); - /** Connection factory based on the "druid-wiki" model + /** URL of the "druid-wiki" model * and the "wikiticker" data set. */ - public static final ImmutableMap<String, String> WIKI = - ImmutableMap.of("model", - DruidAdapterIT.class.getResource("/druid-wiki-model.json") - .getPath()); + public static final URL WIKI = + DruidAdapterIT.class.getResource("/druid-wiki-model.json"); + + /** URL of the "druid-wiki-no-columns" model + * and the "wikiticker" data set. */ + public static final URL WIKI_AUTO = + DruidAdapterIT.class.getResource("/druid-wiki-no-columns-model.json"); + + /** URL of the "druid-wiki-no-tables" model + * and the "wikiticker" data set. */ + public static final URL WIKI_AUTO2 = + DruidAdapterIT.class.getResource("/druid-wiki-no-tables-model.json"); /** Whether to run Druid tests. Enabled by default, however test is only * included if "it" profile is activated ({@code -Pit}). To disable, @@ -100,48 +104,17 @@ public class DruidAdapterIT { }; } - /** Similar to {@link CalciteAssert#checkResultUnordered}, but filters strings - * before comparing them. */ - static Function<ResultSet, Void> checkResultUnordered( - final String... lines) { - return new Function<ResultSet, Void>() { - public Void apply(ResultSet resultSet) { - try { - final List<String> expectedList = Lists.newArrayList(lines); - Collections.sort(expectedList); - - final List<String> actualList = Lists.newArrayList(); - CalciteAssert.toStringList(resultSet, actualList); - for (int i = 0; i < actualList.size(); i++) { - String s = actualList.get(i); - actualList.set(i, - s.replaceAll("\\.0;", ";").replaceAll("\\.0$", "")); - } - Collections.sort(actualList); - - assertThat(actualList, equalTo(expectedList)); - return null; - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - }; - } - - /** Creates a query against the {@link #FOODMART} data set. */ - private CalciteAssert.AssertQuery sql(String sql) { + /** Creates a query against a data set given by a map. */ + private CalciteAssert.AssertQuery sql(String sql, URL url) { return CalciteAssert.that() .enable(enabled()) - .with(FOODMART) + .with(ImmutableMap.of("model", url.getPath())) .query(sql); } - /** Creates a query against the {@link #WIKI} data set. */ - private CalciteAssert.AssertQuery wiki(String sql) { - return CalciteAssert.that() - .enable(enabled()) - .with(WIKI) - .query(sql); + /** Creates a query against the {@link #FOODMART} data set. */ + private CalciteAssert.AssertQuery sql(String sql) { + return sql(sql, FOODMART); } /** Tests a query against the {@link #WIKI} data set. @@ -152,8 +125,36 @@ public class DruidAdapterIT { final String explain = "PLAN=" + "EnumerableInterpreter\n" + " DruidQuery(table=[[wiki, wiki]], filter=[=(CAST($12):VARCHAR(13) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", 'Jeremy Corbyn')], groups=[{4}], aggs=[[]])\n"; + checkSelectDistinctWiki(WIKI, "wiki") + .explainContains(explain); + } + + @Test public void testSelectDistinctWikiNoColumns() { + final String explain = "PLAN=" + + "EnumerableInterpreter\n" + + " DruidQuery(table=[[wiki, wiki]], filter=[=(CAST($18):VARCHAR(13) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", 'Jeremy Corbyn')], groups=[{8}], aggs=[[]])\n"; + checkSelectDistinctWiki(WIKI_AUTO, "wiki") + .explainContains(explain); + } + + @Test public void testSelectDistinctWikiNoTables() { + // Compared to testSelectDistinctWiki, table name is different (because it + // is the raw dataSource name from Druid) and the field offsets are + // different. This is expected. + final String explain = "PLAN=" + + "EnumerableInterpreter\n" + + " DruidQuery(table=[[wiki, wikiticker]], filter=[=(CAST($17):VARCHAR(13) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", 'Jeremy Corbyn')], groups=[{7}], aggs=[[]])\n"; + checkSelectDistinctWiki(WIKI_AUTO2, "wikiticker") + .explainContains(explain); + + // Because no tables are declared, foodmart is automatically present. + sql("select count(*) as c from \"foodmart\"", WIKI_AUTO2) + .returnsUnordered("C=86829"); + } + + private CalciteAssert.AssertQuery checkSelectDistinctWiki(URL url, String tableName) { final String sql = "select distinct \"countryName\"\n" - + "from \"wiki\"\n" + + "from \"" + tableName + "\"\n" + "where \"page\" = 'Jeremy Corbyn'"; final String druidQuery = "{'queryType':'groupBy'," + "'dataSource':'wikiticker','granularity':'all'," @@ -161,10 +162,9 @@ public class DruidAdapterIT { + "'filter':{'type':'selector','dimension':'page','value':'Jeremy Corbyn'}," + "'aggregations':[{'type':'longSum','name':'unit_sales','fieldName':'unit_sales'}]," + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}"; - wiki(sql) + return sql(sql, url) .returnsUnordered("countryName=United Kingdom", "countryName=null") - .explainContains(explain) .queryContains(druidChecker(druidQuery)); } http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-foodmart-model.json ---------------------------------------------------------------------- diff --git a/druid/src/test/resources/druid-foodmart-model.json b/druid/src/test/resources/druid-foodmart-model.json index 68102f0..8e0e14e 100644 --- a/druid/src/test/resources/druid-foodmart-model.json +++ b/druid/src/test/resources/druid-foodmart-model.json @@ -23,7 +23,8 @@ "name": "foodmart", "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory", "operand": { - "url": "http://localhost:8082/druid/v2/?pretty" + "url": "http://localhost:8082", + "coordinatorUrl": "http://localhost:8081" }, "tables": [ { http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-wiki-model.json ---------------------------------------------------------------------- diff --git a/druid/src/test/resources/druid-wiki-model.json b/druid/src/test/resources/druid-wiki-model.json index eb13494..f345788 100644 --- a/druid/src/test/resources/druid-wiki-model.json +++ b/druid/src/test/resources/druid-wiki-model.json @@ -23,7 +23,8 @@ "name": "wiki", "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory", "operand": { - "url": "http://localhost:8082/druid/v2/?pretty" + "url": "http://localhost:8082", + "coordinatorUrl": "http://localhost:8081" }, "tables": [ { http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-wiki-no-columns-model.json ---------------------------------------------------------------------- diff --git a/druid/src/test/resources/druid-wiki-no-columns-model.json b/druid/src/test/resources/druid-wiki-no-columns-model.json new file mode 100644 index 0000000..c9231c4 --- /dev/null +++ b/druid/src/test/resources/druid-wiki-no-columns-model.json @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Druid model that has one table defined, and table has no columns, so adapter + * needs to discover columns. + */ +{ + "version": "1.0", + "defaultSchema": "wiki", + "schemas": [ + { + "type": "custom", + "name": "wiki", + "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory", + "operand": { + "url": "http://localhost:8082", + "coordinatorUrl": "http://localhost:8081" + }, + "tables": [ + { + "name": "wiki", + "factory": "org.apache.calcite.adapter.druid.DruidTableFactory", + "operand": { + "dataSource": "wikiticker", + "interval": "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z", + "timestampColumn": "time" + } + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-wiki-no-tables-model.json ---------------------------------------------------------------------- diff --git a/druid/src/test/resources/druid-wiki-no-tables-model.json b/druid/src/test/resources/druid-wiki-no-tables-model.json new file mode 100644 index 0000000..5b22ef9 --- /dev/null +++ b/druid/src/test/resources/druid-wiki-no-tables-model.json @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Druid model where there are no table definitions. + */ +{ + "version": "1.0", + "defaultSchema": "wiki", + "schemas": [ + { + "type": "custom", + "name": "wiki", + "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory", + "operand": { + "url": "http://localhost:8082", + "coordinatorUrl": "http://localhost:8081" + } + } + ] +} http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/site/_docs/druid_adapter.md ---------------------------------------------------------------------- diff --git a/site/_docs/druid_adapter.md b/site/_docs/druid_adapter.md index 3a330a6..9de2fb9 100644 --- a/site/_docs/druid_adapter.md +++ b/site/_docs/druid_adapter.md @@ -51,7 +51,8 @@ A basic example of a model file is given below: "name": "wiki", "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory", "operand": { - "url": "http://localhost:8082/druid/v2/?pretty" + "url": "http://localhost:8082", + "coordinatorUrl": "http://localhost:8081" }, "tables": [ {
