DRILL-5337: OpenTSDB storage plugin closes #774
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/496c97d1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/496c97d1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/496c97d1 Branch: refs/heads/master Commit: 496c97d14eb428a5aff74e82d662a0da6930e94f Parents: 29e0547 Author: Vlad Storona <vstor...@cybervisiontech.com> Authored: Fri Nov 25 20:28:02 2016 +0200 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Mon Nov 13 11:04:54 2017 +0200 ---------------------------------------------------------------------- contrib/pom.xml | 1 + contrib/storage-opentsdb/README.md | 69 +++++ contrib/storage-opentsdb/pom.xml | 80 ++++++ .../drill/exec/store/openTSDB/Constants.java | 32 +++ .../exec/store/openTSDB/DrillOpenTSDBTable.java | 81 ++++++ .../store/openTSDB/OpenTSDBBatchCreator.java | 53 ++++ .../exec/store/openTSDB/OpenTSDBGroupScan.java | 169 ++++++++++++ .../store/openTSDB/OpenTSDBRecordReader.java | 258 +++++++++++++++++++ .../exec/store/openTSDB/OpenTSDBScanSpec.java | 42 +++ .../store/openTSDB/OpenTSDBStoragePlugin.java | 77 ++++++ .../openTSDB/OpenTSDBStoragePluginConfig.java | 77 ++++++ .../exec/store/openTSDB/OpenTSDBSubScan.java | 132 ++++++++++ .../apache/drill/exec/store/openTSDB/Util.java | 66 +++++ .../exec/store/openTSDB/client/OpenTSDB.java | 50 ++++ .../store/openTSDB/client/OpenTSDBTypes.java | 28 ++ .../exec/store/openTSDB/client/Schema.java | 124 +++++++++ .../exec/store/openTSDB/client/Service.java | 55 ++++ .../store/openTSDB/client/query/DBQuery.java | 148 +++++++++++ .../exec/store/openTSDB/client/query/Query.java | 187 ++++++++++++++ .../openTSDB/client/services/ServiceImpl.java | 174 +++++++++++++ .../exec/store/openTSDB/dto/ColumnDTO.java | 63 +++++ .../exec/store/openTSDB/dto/MetricDTO.java | 77 ++++++ .../openTSDB/schema/OpenTSDBSchemaFactory.java | 77 ++++++ .../resources/bootstrap-storage-plugins.json | 9 + .../src/main/resources/drill-module.conf | 21 ++ .../drill/store/openTSDB/TestDataHolder.java | 247 ++++++++++++++++++ .../store/openTSDB/TestOpenTSDBPlugin.java | 189 ++++++++++++++ distribution/pom.xml | 5 + 28 files changed, 2591 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 2014923..d4ad434 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -38,6 +38,7 @@ <module>storage-mongo</module> <module>storage-jdbc</module> <module>storage-kudu</module> + <module>storage-opentsdb</module> <module>sqlline</module> <module>data</module> <module>gis</module> http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/README.md ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/README.md b/contrib/storage-opentsdb/README.md new file mode 100644 index 0000000..0c616b5 --- /dev/null +++ b/contrib/storage-opentsdb/README.md @@ -0,0 +1,69 @@ +# drill-storage-openTSDB + +Implementation of TSDB storage plugin. Plugin uses REST API to work with TSDB. + +For more information about openTSDB follow this link <http://opentsdb.net> + +There is list of required params: + +* metric - The name of a metric stored in the db. + +* start - The start time for the query. This can be a relative or absolute timestamp. + +* aggregator - The name of an aggregation function to use. + +optional param is: + +* downsample - An optional downsampling function to reduce the amount of data returned. + +* end - An end time for the query. If not supplied, the TSD will assume the local system time on the server. +This may be a relative or absolute timestamp. This param is optional, and if it isn't specified we will send null +to the db in this field, but in this case db will assume the local system time on the server. + +List of supported aggregators + +<http://opentsdb.net/docs/build/html/user_guide/query/aggregators.html> + +List of supported time + +<http://opentsdb.net/docs/build/html/user_guide/query/dates.html> + +Params must be specified in FROM clause of the query separated by commas. For example + +`openTSDB.(metric=metric_name, start=4d-ago, aggregator=sum)` + +Supported queries for now are listed below: + +``` +USE openTSDB +``` + +``` +SHOW tables +``` +Will print available metrics. Max number of the printed results is a Integer.MAX value + +``` +SELECT * FROM openTSDB. `(metric=warp.speed.test, start=47y-ago, aggregator=sum)` +``` +Return aggregated elements from `warp.speed.test` table since 47y-ago + +``` +SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` +``` +Return aggregated elements from `warp.speed.test` table + +``` +SELECT `timestamp`, sum(`aggregated value`) FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` GROUP BY `timestamp` +``` +Return aggregated and grouped value by standard drill functions from `warp.speed.test table`, but with the custom aggregator + +``` +SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, downsample=5m-avg)` +``` +Return aggregated data limited by downsample + +``` +SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, end=1407165403000)` +``` +Return aggregated data limited by end time \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/pom.xml b/contrib/storage-opentsdb/pom.xml new file mode 100644 index 0000000..aff1bfa --- /dev/null +++ b/contrib/storage-opentsdb/pom.xml @@ -0,0 +1,80 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>drill-contrib-parent</artifactId> + <groupId>org.apache.drill.contrib</groupId> + <version>1.12.0-SNAPSHOT</version> + </parent> + + <artifactId>drill-opentsdb-storage</artifactId> + + <name>contrib/opentsdb-storage-plugin</name> + + <dependencies> + <dependency> + <groupId>org.apache.drill.exec</groupId> + <artifactId>drill-java-exec</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.drill.exec</groupId> + <artifactId>drill-java-exec</artifactId> + <classifier>tests</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.drill</groupId> + <artifactId>drill-common</artifactId> + <classifier>tests</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.github.tomakehurst</groupId> + <artifactId>wiremock-standalone</artifactId> + <version>2.5.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.squareup.retrofit2</groupId> + <artifactId>retrofit</artifactId> + <version>2.1.0</version> + </dependency> + <dependency> + <groupId>com.squareup.retrofit2</groupId> + <artifactId>converter-jackson</artifactId> + <version>2.1.0</version> + </dependency> + <dependency> + <groupId>com.madhukaraphatak</groupId> + <artifactId>java-sizeof_2.11</artifactId> + <version>0.1</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java new file mode 100644 index 0000000..c812ff5 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +public interface Constants { + /** + * openTSDB required constants for API call + */ + public static final String DEFAULT_TIME = "47y-ago"; + public static final String SUM_AGGREGATOR = "sum"; + + public static final String TIME_PARAM = "start"; + public static final String END_TIME_PARAM = "end"; + public static final String METRIC_PARAM = "metric"; + public static final String AGGREGATOR_PARAM = "aggregator"; + public static final String DOWNSAMPLE_PARAM = "downsample"; +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java new file mode 100644 index 0000000..bdbb670 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.google.common.collect.Lists; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; +import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes; +import org.apache.drill.exec.store.openTSDB.client.Schema; +import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.DOUBLE; +import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.STRING; +import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.TIMESTAMP; + +public class DrillOpenTSDBTable extends DynamicDrillTable { + + private static final Logger log = + LoggerFactory.getLogger(DrillOpenTSDBTable.class); + + private final Schema schema; + + public DrillOpenTSDBTable(String storageEngineName, OpenTSDBStoragePlugin plugin, Schema schema, OpenTSDBScanSpec scanSpec) { + super(plugin, storageEngineName, scanSpec); + this.schema = schema; + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + List<String> names = Lists.newArrayList(); + List<RelDataType> types = Lists.newArrayList(); + convertToRelDataType(typeFactory, names, types); + return typeFactory.createStructType(types, names); + } + + private void convertToRelDataType(RelDataTypeFactory typeFactory, List<String> names, List<RelDataType> types) { + for (ColumnDTO column : schema.getColumns()) { + names.add(column.getColumnName()); + RelDataType type = getSqlTypeFromOpenTSDBType(typeFactory, column.getColumnType()); + type = typeFactory.createTypeWithNullability(type, column.isNullable()); + types.add(type); + } + } + + private RelDataType getSqlTypeFromOpenTSDBType(RelDataTypeFactory typeFactory, OpenTSDBTypes type) { + switch (type) { + case STRING: + return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE); + case DOUBLE: + return typeFactory.createSqlType(SqlTypeName.DOUBLE); + case TIMESTAMP: + return typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + default: + throw UserException.unsupportedError() + .message(String.format("%s is unsupported now. Currently supported types is %s, %s, %s", type, STRING, DOUBLE, TIMESTAMP)) + .build(log); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java new file mode 100644 index 0000000..935aaa5 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; + +import java.util.List; + +public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> { + + @Override + public CloseableRecordBatch getBatch(FragmentContext context, OpenTSDBSubScan subScan, + List<RecordBatch> children) throws ExecutionSetupException { + List<RecordReader> readers = Lists.newArrayList(); + List<SchemaPath> columns; + + for (OpenTSDBSubScan.OpenTSDBSubScanSpec scanSpec : subScan.getTabletScanSpecList()) { + try { + if ((columns = subScan.getColumns()) == null) { + columns = GroupScan.ALL_COLUMNS; + } + readers.add(new OpenTSDBRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns)); + } catch (Exception e) { + throw new ExecutionSetupException(e); + } + } + return new ScanBatch(subScan, context, readers); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java new file mode 100644 index 0000000..47c805a --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.madhukaraphatak.sizeof.SizeEstimator; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.openTSDB.OpenTSDBSubScan.OpenTSDBSubScanSpec; +import org.apache.drill.exec.store.openTSDB.client.services.ServiceImpl; +import org.apache.drill.exec.store.openTSDB.dto.MetricDTO; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.drill.exec.store.openTSDB.Util.fromRowData; + +@JsonTypeName("openTSDB-scan") +public class OpenTSDBGroupScan extends AbstractGroupScan { + + private OpenTSDBStoragePluginConfig storagePluginConfig; + private OpenTSDBScanSpec openTSDBScanSpec; + private OpenTSDBStoragePlugin storagePlugin; + + private List<SchemaPath> columns; + + @JsonCreator + public OpenTSDBGroupScan(@JsonProperty("openTSDBScanSpec") OpenTSDBScanSpec openTSDBScanSpec, + @JsonProperty("storage") OpenTSDBStoragePluginConfig openTSDBStoragePluginConfig, + @JsonProperty("columns") List<SchemaPath> columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { + this((OpenTSDBStoragePlugin) pluginRegistry.getPlugin(openTSDBStoragePluginConfig), openTSDBScanSpec, columns); + } + + public OpenTSDBGroupScan(OpenTSDBStoragePlugin storagePlugin, + OpenTSDBScanSpec scanSpec, List<SchemaPath> columns) { + super((String) null); + this.storagePlugin = storagePlugin; + this.storagePluginConfig = storagePlugin.getConfig(); + this.openTSDBScanSpec = scanSpec; + this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns; + } + + /** + * Private constructor, used for cloning. + * + * @param that The OpenTSDBGroupScan to clone + */ + private OpenTSDBGroupScan(OpenTSDBGroupScan that) { + super((String) null); + this.columns = that.columns; + this.openTSDBScanSpec = that.openTSDBScanSpec; + this.storagePlugin = that.storagePlugin; + this.storagePluginConfig = that.storagePluginConfig; + } + + @Override + public int getMaxParallelizationWidth() { + return 1; + } + + @Override + public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { + } + + @Override + public OpenTSDBSubScan getSpecificScan(int minorFragmentId) { + List<OpenTSDBSubScanSpec> scanSpecList = Lists.newArrayList(); + scanSpecList.add(new OpenTSDBSubScanSpec(getTableName())); + return new OpenTSDBSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns); + } + + @Override + public ScanStats getScanStats() { + ServiceImpl client = storagePlugin.getClient(); + Map<String, String> params = fromRowData(openTSDBScanSpec.getTableName()); + Set<MetricDTO> allMetrics = client.getAllMetrics(params); + long numMetrics = allMetrics.size(); + float approxDiskCost = 0; + if (numMetrics != 0) { + MetricDTO metricDTO = allMetrics.iterator().next(); + // This method estimates the sizes of Java objects (number of bytes of memory they occupy). + // more detailed information about how this estimation method work you can find in this article + // http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html + approxDiskCost = SizeEstimator.estimate(metricDTO) * numMetrics; + } + return new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, numMetrics, 1, approxDiskCost); + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new OpenTSDBGroupScan(this); + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + @JsonIgnore + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; + } + + @JsonIgnore + public String getTableName() { + return getOpenTSDBScanSpec().getTableName(); + } + + @JsonProperty + public OpenTSDBScanSpec getOpenTSDBScanSpec() { + return openTSDBScanSpec; + } + + @JsonProperty("storage") + public OpenTSDBStoragePluginConfig getStoragePluginConfig() { + return storagePluginConfig; + } + + @JsonProperty + public List<SchemaPath> getColumns() { + return columns; + } + + @Override + public GroupScan clone(List<SchemaPath> columns) { + OpenTSDBGroupScan newScan = new OpenTSDBGroupScan(this); + newScan.columns = columns; + return newScan; + } + + @Override + public String toString() { + return "OpenTSDBGroupScan [OpenTSDBScanSpec=" + openTSDBScanSpec + ", columns=" + columns + + "]"; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java new file mode 100644 index 0000000..044c232 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes; +import org.apache.drill.exec.store.openTSDB.client.Schema; +import org.apache.drill.exec.store.openTSDB.client.Service; +import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO; +import org.apache.drill.exec.store.openTSDB.dto.MetricDTO; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.ValueVector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM; +import static org.apache.drill.exec.store.openTSDB.Util.fromRowData; + +public class OpenTSDBRecordReader extends AbstractRecordReader { + + private static final Logger log = LoggerFactory.getLogger(OpenTSDBRecordReader.class); + + // batch size should not exceed max allowed record count + private static final int TARGET_RECORD_COUNT = 4000; + + private static final Map<OpenTSDBTypes, MinorType> TYPES; + + private Service db; + + private Iterator<MetricDTO> tableIterator; + private OutputMutator output; + private ImmutableList<ProjectedColumnInfo> projectedCols; + + private Map<String, String> params; + + public OpenTSDBRecordReader(Service client, OpenTSDBSubScan.OpenTSDBSubScanSpec subScanSpec, + List<SchemaPath> projectedColumns) throws IOException { + setColumns(projectedColumns); + this.db = client; + this.params = + fromRowData(subScanSpec.getTableName()); + log.debug("Scan spec: {}", subScanSpec); + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { + this.output = output; + Set<MetricDTO> metrics = + db.getAllMetrics(params); + if (metrics == null) { + throw UserException.validationError() + .message(String.format("Table '%s' not found", params.get(METRIC_PARAM))) + .build(log); + } + this.tableIterator = metrics.iterator(); + } + + @Override + public int next() { + try { + return processOpenTSDBTablesData(); + } catch (SchemaChangeException e) { + throw new DrillRuntimeException(e); + } + } + + @Override + public void close() throws Exception { + } + + static { + TYPES = ImmutableMap.<OpenTSDBTypes, MinorType>builder() + .put(OpenTSDBTypes.STRING, MinorType.VARCHAR) + .put(OpenTSDBTypes.DOUBLE, MinorType.FLOAT8) + .put(OpenTSDBTypes.TIMESTAMP, MinorType.TIMESTAMP) + .build(); + } + + private static class ProjectedColumnInfo { + ValueVector vv; + ColumnDTO openTSDBColumn; + } + + private int processOpenTSDBTablesData() throws SchemaChangeException { + int rowCounter = 0; + while (tableIterator.hasNext() && rowCounter < TARGET_RECORD_COUNT) { + MetricDTO metricDTO = tableIterator.next(); + rowCounter = addRowResult(metricDTO, rowCounter); + } + return rowCounter; + } + + private int addRowResult(MetricDTO table, int rowCounter) throws SchemaChangeException { + setupProjectedColsIfItNull(); + for (String time : table.getDps().keySet()) { + String value = table.getDps().get(time); + setupDataToDrillTable(table, time, value, table.getTags(), rowCounter); + rowCounter++; + } + return rowCounter; + } + + private void setupProjectedColsIfItNull() throws SchemaChangeException { + if (projectedCols == null) { + initCols(new Schema(db, params.get(METRIC_PARAM))); + } + } + + private void setupDataToDrillTable(MetricDTO table, String timestamp, String value, Map<String, String> tags, int rowCount) { + for (ProjectedColumnInfo pci : projectedCols) { + switch (pci.openTSDBColumn.getColumnName()) { + case "metric": + setStringColumnValue(table.getMetric(), pci, rowCount); + break; + case "aggregate tags": + setStringColumnValue(table.getAggregateTags().toString(), pci, rowCount); + break; + case "timestamp": + setTimestampColumnValue(timestamp, pci, rowCount); + break; + case "aggregated value": + setDoubleColumnValue(value, pci, rowCount); + break; + default: + setStringColumnValue(tags.get(pci.openTSDBColumn.getColumnName()), pci, rowCount); + } + } + } + + private void setTimestampColumnValue(String timestamp, ProjectedColumnInfo pci, int rowCount) { + setTimestampColumnValue(timestamp != null ? Long.parseLong(timestamp) : Long.parseLong("0"), pci, rowCount); + } + + private void setDoubleColumnValue(String value, ProjectedColumnInfo pci, int rowCount) { + setDoubleColumnValue(value != null ? Double.parseDouble(value) : 0.0, pci, rowCount); + } + + private void setStringColumnValue(String data, ProjectedColumnInfo pci, int rowCount) { + if (data == null) { + data = "null"; + } + ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8)); + ((NullableVarCharVector.Mutator) pci.vv.getMutator()) + .setSafe(rowCount, value, 0, value.remaining()); + } + + private void setTimestampColumnValue(Long data, ProjectedColumnInfo pci, int rowCount) { + ((NullableTimeStampVector.Mutator) pci.vv.getMutator()) + .setSafe(rowCount, data * 1000); + } + + private void setDoubleColumnValue(Double data, ProjectedColumnInfo pci, int rowCount) { + ((NullableFloat8Vector.Mutator) pci.vv.getMutator()) + .setSafe(rowCount, data); + } + + private void initCols(Schema schema) throws SchemaChangeException { + ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder(); + + for (int i = 0; i < schema.getColumnCount(); i++) { + + ColumnDTO column = schema.getColumnByIndex(i); + final String name = column.getColumnName(); + final OpenTSDBTypes type = column.getColumnType(); + TypeProtos.MinorType minorType = TYPES.get(type); + + if (isMinorTypeNull(minorType)) { + String message = String.format( + "A column you queried has a data type that is not currently supported by the OpenTSDB storage plugin. " + + "The column's name was %s and its OpenTSDB data type was %s. ", name, type.toString()); + throw UserException.unsupportedError() + .message(message) + .build(log); + } + + ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType); + pciBuilder.add(pci); + } + projectedCols = pciBuilder.build(); + } + + private boolean isMinorTypeNull(MinorType minorType) { + return minorType == null; + } + + private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, String name, MinorType minorType) throws SchemaChangeException { + MajorType majorType = getMajorType(minorType); + + MaterializedField field = + MaterializedField.create(name, majorType); + + ValueVector vector = + getValueVector(minorType, majorType, field); + + return getProjectedColumnInfo(column, vector); + } + + private MajorType getMajorType(MinorType minorType) { + MajorType majorType; + majorType = Types.optional(minorType); + return majorType; + } + + private ValueVector getValueVector(MinorType minorType, MajorType majorType, MaterializedField field) throws SchemaChangeException { + final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( + minorType, majorType.getMode()); + ValueVector vector = output.addField(field, clazz); + vector.allocateNew(); + return vector; + } + + private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, ValueVector vector) { + ProjectedColumnInfo pci = new ProjectedColumnInfo(); + pci.vv = vector; + pci.openTSDBColumn = column; + return pci; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java new file mode 100644 index 0000000..f93758d --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class OpenTSDBScanSpec { + + private final String tableName; + + @JsonCreator + public OpenTSDBScanSpec(@JsonProperty("tableName") String tableName) { + this.tableName = tableName; + } + + public String getTableName() { + return tableName; + } + + @Override + public String toString() { + return "OpenTSDBScanSpec{" + + "tableName='" + tableName + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java new file mode 100644 index 0000000..176dff0 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.openTSDB.client.services.ServiceImpl; +import org.apache.drill.exec.store.openTSDB.schema.OpenTSDBSchemaFactory; + +import java.io.IOException; + +public class OpenTSDBStoragePlugin extends AbstractStoragePlugin { + + private final DrillbitContext context; + + private final OpenTSDBStoragePluginConfig engineConfig; + private final OpenTSDBSchemaFactory schemaFactory; + + private final ServiceImpl db; + + public OpenTSDBStoragePlugin(OpenTSDBStoragePluginConfig configuration, DrillbitContext context, String name) throws IOException { + this.context = context; + this.schemaFactory = new OpenTSDBSchemaFactory(this, name); + this.engineConfig = configuration; + this.db = new ServiceImpl(configuration.getConnection()); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public OpenTSDBStoragePluginConfig getConfig() { + return engineConfig; + } + + @Override + public OpenTSDBGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { + OpenTSDBScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<OpenTSDBScanSpec>() { + }); + return new OpenTSDBGroupScan(this, scanSpec, null); + } + + @Override + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + schemaFactory.registerSchemas(schemaConfig, parent); + } + + public ServiceImpl getClient() { + return db; + } + + DrillbitContext getContext() { + return this.context; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java new file mode 100644 index 0000000..1b67c1d --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.logical.StoragePluginConfigBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; + +@JsonTypeName(OpenTSDBStoragePluginConfig.NAME) +public class OpenTSDBStoragePluginConfig extends StoragePluginConfigBase { + + private static final Logger log = LoggerFactory.getLogger(OpenTSDBStoragePluginConfig.class); + + public static final String NAME = "openTSDB"; + + private final String connection; + + @JsonCreator + public OpenTSDBStoragePluginConfig(@JsonProperty("connection") String connection) throws IOException { + if (connection == null || connection.isEmpty()) { + throw UserException.validationError() + .message("Connection property must not be null. Check plugin configuration.") + .build(log); + } + this.connection = connection; + } + + public String getConnection() { + return connection; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OpenTSDBStoragePluginConfig that = (OpenTSDBStoragePluginConfig) o; + return Objects.equals(connection, that.connection); + } + + @Override + public int hashCode() { + return connection != null ? connection.hashCode() : 0; + } + + @Override + public String toString() { + return "OpenTSDBStoragePluginConfig{" + + "connection='" + connection + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java new file mode 100644 index 0000000..4e93804 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +@JsonTypeName("openTSDB-sub-scan") +public class OpenTSDBSubScan extends AbstractBase implements SubScan { + + private static final Logger log = + LoggerFactory.getLogger(OpenTSDBSubScan.class); + + public final OpenTSDBStoragePluginConfig storage; + + private final List<SchemaPath> columns; + private final OpenTSDBStoragePlugin openTSDBStoragePlugin; + private final List<OpenTSDBSubScanSpec> tabletScanSpecList; + + @JsonCreator + public OpenTSDBSubScan(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("storage") OpenTSDBStoragePluginConfig storage, + @JsonProperty("tabletScanSpecList") LinkedList<OpenTSDBSubScanSpec> tabletScanSpecList, + @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException { + super((String) null); + openTSDBStoragePlugin = (OpenTSDBStoragePlugin) registry.getPlugin(storage); + this.tabletScanSpecList = tabletScanSpecList; + this.storage = storage; + this.columns = columns; + } + + public OpenTSDBSubScan(OpenTSDBStoragePlugin plugin, OpenTSDBStoragePluginConfig config, + List<OpenTSDBSubScanSpec> tabletInfoList, List<SchemaPath> columns) { + super((String) null); + openTSDBStoragePlugin = plugin; + storage = config; + this.tabletScanSpecList = tabletInfoList; + this.columns = columns; + } + + @Override + public int getOperatorType() { + return 0; + } + + @Override + public boolean isExecutable() { + return false; + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + return new OpenTSDBSubScan(openTSDBStoragePlugin, storage, tabletScanSpecList, columns); + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return Collections.emptyIterator(); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + public List<SchemaPath> getColumns() { + return columns; + } + + public List<OpenTSDBSubScanSpec> getTabletScanSpecList() { + return tabletScanSpecList; + } + + @JsonIgnore + public OpenTSDBStoragePlugin getStorageEngine() { + return openTSDBStoragePlugin; + } + + @JsonProperty("storage") + public OpenTSDBStoragePluginConfig getStorageConfig() { + return storage; + } + + public static class OpenTSDBSubScanSpec { + + private final String tableName; + + @JsonCreator + public OpenTSDBSubScanSpec(@JsonProperty("tableName") String tableName) { + this.tableName = tableName; + } + + public String getTableName() { + return tableName; + } + + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java new file mode 100644 index 0000000..6e0ef05 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB; + +import com.google.common.base.Splitter; +import org.apache.drill.common.exceptions.UserException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class Util { + + private static final Logger log = LoggerFactory.getLogger(Util.class); + + /** + * Parse FROM parameters to Map representation + * + * @param rowData with this syntax (metric=warp.speed.test) + * @return Map with params key: metric, value: warp.speed.test + */ + public static Map<String, String> fromRowData(String rowData) { + try { + String fromRowData = rowData.replaceAll("[()]", ""); + return Splitter.on(",").trimResults().omitEmptyStrings().withKeyValueSeparator("=").split(fromRowData); + } catch (IllegalArgumentException e) { + throw UserException.validationError() + .message(String.format("Syntax error in the query %s", rowData)) + .build(log); + } + } + + /** + * @param name Metric name + * @return Valid metric name + */ + public static String getValidTableName(String name) { + if (!isTableNameValid(name)) { + name = fromRowData(name).get("metric"); + } + return name; + } + + /** + * @param name Metric name + * @return true if name is valid + */ + public static boolean isTableNameValid(String name) { + return !name.contains("="); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java new file mode 100644 index 0000000..1d561c2 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.client; + +import org.apache.drill.exec.store.openTSDB.client.query.DBQuery; +import org.apache.drill.exec.store.openTSDB.dto.MetricDTO; +import retrofit2.Call; +import retrofit2.http.Body; +import retrofit2.http.GET; +import retrofit2.http.POST; + +import java.util.Set; + +/** + * Client for API requests to openTSDB + */ +public interface OpenTSDB { + + /** + * Used for getting all metrics names from openTSDB + * + * @return Set<String> with all tables names + */ + @GET("api/suggest?type=metrics&max=" + Integer.MAX_VALUE) + Call<Set<String>> getAllTablesName(); + + /** + * Overloaded getTables for POST request to DB + * + * @param query Query for for selecting data + * @return Set<Table> with metrics from openTSDB + */ + @POST("api/query") + Call<Set<MetricDTO>> getTables(@Body DBQuery query); +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java new file mode 100644 index 0000000..2a6b802 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.client; + +/** + * Types in openTSDB records, + * used for converting openTSDB data to Sql representation + */ +public enum OpenTSDBTypes { + STRING, + DOUBLE, + TIMESTAMP +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java new file mode 100644 index 0000000..2c8dc9f --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.client; + +import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.drill.exec.store.openTSDB.Constants.AGGREGATOR_PARAM; +import static org.apache.drill.exec.store.openTSDB.Constants.DEFAULT_TIME; +import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM; +import static org.apache.drill.exec.store.openTSDB.Constants.SUM_AGGREGATOR; +import static org.apache.drill.exec.store.openTSDB.Constants.TIME_PARAM; +import static org.apache.drill.exec.store.openTSDB.Util.getValidTableName; +import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.AGGREGATED_VALUE; +import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.AGGREGATE_TAGS; +import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.METRIC; +import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.TIMESTAMP; + +/** + * Abstraction for representing structure of openTSDB table + */ +public class Schema { + + private static final Logger log = + LoggerFactory.getLogger(Schema.class); + + private final List<ColumnDTO> columns = new ArrayList<>(); + private final Service db; + private final String name; + + public Schema(Service db, String name) { + this.db = db; + this.name = name; + setupStructure(); + } + + private void setupStructure() { + columns.add(new ColumnDTO(METRIC.toString(), OpenTSDBTypes.STRING)); + columns.add(new ColumnDTO(AGGREGATE_TAGS.toString(), OpenTSDBTypes.STRING)); + columns.add(new ColumnDTO(TIMESTAMP.toString(), OpenTSDBTypes.TIMESTAMP)); + columns.add(new ColumnDTO(AGGREGATED_VALUE.toString(), OpenTSDBTypes.DOUBLE)); + columns.addAll(db.getUnfixedColumns(getParamsForQuery())); + } + + /** + * Return list with all columns names and its types + * + * @return List<ColumnDTO> + */ + public List<ColumnDTO> getColumns() { + return Collections.unmodifiableList(columns); + } + + /** + * Number of columns in table + * + * @return number of table columns + */ + public int getColumnCount() { + return columns.size(); + } + + /** + * @param columnIndex index of required column in table + * @return ColumnDTO + */ + public ColumnDTO getColumnByIndex(int columnIndex) { + return columns.get(columnIndex); + } + + // Create map with required params, for querying metrics. + // Without this params, we cannot make API request to db. + private Map<String, String> getParamsForQuery() { + HashMap<String, String> params = new HashMap<>(); + params.put(METRIC_PARAM, getValidTableName(name)); + params.put(AGGREGATOR_PARAM, SUM_AGGREGATOR); + params.put(TIME_PARAM, DEFAULT_TIME); + return params; + } + + /** + * Structure with constant openTSDB columns + */ + enum DefaultColumns { + + METRIC("metric"), + TIMESTAMP("timestamp"), + AGGREGATE_TAGS("aggregate tags"), + AGGREGATED_VALUE("aggregated value"); + + private String columnName; + + DefaultColumns(String name) { + this.columnName = name; + } + + @Override + public String toString() { + return columnName; + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java new file mode 100644 index 0000000..0be7394 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.client; + +import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO; +import org.apache.drill.exec.store.openTSDB.dto.MetricDTO; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public interface Service { + /** + * + * Used for getting all Metrics from openTSDB. + * Must be present required params: metric, start, aggregator + * + * @param queryParam parameters for the API request + * @return Set<MetricDTO> all metrics + */ + Set<MetricDTO> getAllMetrics(Map<String, String> queryParam); + + /** + * + * Used for getting all metrics names from openTSDB + * + * @return Set<String> metric names + */ + Set<String> getAllMetricNames(); + + /** + * + * Used for getting all non fixed columns based on tags from openTSDB + * Must be present required params: metric, start, aggregator + * + * @param queryParam parameters for the API request + * @return List<ColumnDTO> columns based on tags + */ + List<ColumnDTO> getUnfixedColumns(Map<String, String> queryParam); +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java new file mode 100644 index 0000000..e79d0ce --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.client.query; + +import org.apache.drill.common.exceptions.UserException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +/** + * DBQuery is an abstraction of an openTSDB query, + * that used for extracting data from the storage system by POST request to DB. + * <p> + * An OpenTSDB query requires at least one sub query, + * a means of selecting which time series should be included in the result set. + */ +public class DBQuery { + + private static final Logger log = + LoggerFactory.getLogger(DBQuery.class); + /** + * The start time for the query. This can be a relative or absolute timestamp. + */ + private String start; + /** + * An end time for the query. If not supplied, the TSD will assume the local system time on the server. + * This may be a relative or absolute timestamp. This param is optional, and if it isn't specified we will send null + * to the db in this field, but in this case db will assume the local system time on the server. + */ + private String end; + /** + * One or more sub subQueries used to select the time series to return. + */ + private Set<Query> queries; + + private DBQuery(Builder builder) { + this.start = builder.start; + this.end = builder.end; + this.queries = builder.queries; + } + + public String getStart() { + return start; + } + + public String getEnd() { + return end; + } + + public Set<Query> getQueries() { + return queries; + } + + public static class Builder { + + private String start; + private String end; + private Set<Query> queries = new HashSet<>(); + + public Builder() { + } + + public Builder setStartTime(String startTime) { + if (startTime == null) { + throw UserException.validationError() + .message("start param must be specified") + .build(log); + } + this.start = startTime; + return this; + } + + public Builder setEndTime(String endTime) { + this.end = endTime; + return this; + } + + public Builder setQueries(Set<Query> queries) { + if (queries.isEmpty()) { + throw UserException.validationError() + .message("Required params such as metric, aggregator weren't specified. " + + "Add these params to the query") + .build(log); + } + this.queries = queries; + return this; + } + + public DBQuery build() { + return new DBQuery(this); + } + + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DBQuery dbQuery = (DBQuery) o; + + if (!start.equals(dbQuery.start)) { + return false; + } + if (!end.equals(dbQuery.end)) { + return false; + } + return queries.equals(dbQuery.queries); + } + + @Override + public int hashCode() { + int result = start.hashCode(); + result = 31 * result + end.hashCode(); + result = 31 * result + queries.hashCode(); + return result; + } + + @Override + public String toString() { + return "DBQuery{" + + "start='" + start + '\'' + + ", end='" + end + '\'' + + ", queries=" + queries + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java new file mode 100644 index 0000000..bdcd1c4 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.client.query; + +import org.apache.drill.common.exceptions.UserException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Query is an abstraction of openTSDB subQuery + * and it is integral part of DBQuery + * <p> + * Each sub query can retrieve individual or groups of timeseries data, + * performing aggregation on each set. + */ +public class Query { + private static final Logger log = + LoggerFactory.getLogger(Query.class); + /** + * The name of an aggregation function to use. + */ + private String aggregator; + /** + * The name of a metric stored in the system + */ + private String metric; + /** + * Whether or not the data should be converted into deltas before returning. + * This is useful if the metric is a continuously incrementing counter + * and you want to view the rate of change between data points. + */ + private String rate; + /** + * An optional downsampling function to reduce the amount of data returned. + */ + private String downsample; + /** + * To drill down to specific timeseries or group results by tag, + * supply one or more map values in the same format as the query string. + */ + private Map<String, String> tags; + + private Query(Builder builder) { + this.aggregator = builder.aggregator; + this.metric = builder.metric; + this.rate = builder.rate; + this.downsample = builder.downsample; + this.tags = builder.tags; + } + + public String getAggregator() { + return aggregator; + } + + public String getMetric() { + return metric; + } + + public String getRate() { + return rate; + } + + public String getDownsample() { + return downsample; + } + + public Map<String, String> getTags() { + return tags; + } + + public static class Builder { + + private String aggregator; + private String metric; + private String rate; + private String downsample; + private Map<String, String> tags = new HashMap<>(); + + public Builder(String metric) { + this.metric = metric; + } + + public Builder setAggregator(String aggregator) { + if (aggregator == null) { + throw UserException.validationError() + .message("aggregator param must be specified") + .build(log); + } + this.aggregator = aggregator; + return this; + } + + public Builder setMetric(String metric) { + if (metric == null) { + throw UserException.validationError() + .message("metric param must be specified") + .build(log); + } + this.metric = metric; + return this; + } + + public Builder setRate(String rate) { + this.rate = rate; + return this; + } + + public Builder setDownsample(String downsample) { + this.downsample = downsample; + return this; + } + + public Builder setTags(Map<String, String> tags) { + this.tags = tags; + return this; + } + + public Query build() { + return new Query(this); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Query subQuery = (Query) o; + + if (aggregator != null ? !aggregator.equals(subQuery.aggregator) : subQuery.aggregator != null) { + return false; + } + if (metric != null ? !metric.equals(subQuery.metric) : subQuery.metric != null) { + return false; + } + if (rate != null ? !rate.equals(subQuery.rate) : subQuery.rate != null) { + return false; + } + if (downsample != null ? !downsample.equals(subQuery.downsample) : subQuery.downsample != null) { + return false; + } + return tags != null ? tags.equals(subQuery.tags) : subQuery.tags == null; + } + + @Override + public int hashCode() { + int result = aggregator != null ? aggregator.hashCode() : 0; + result = 31 * result + (metric != null ? metric.hashCode() : 0); + result = 31 * result + (rate != null ? rate.hashCode() : 0); + result = 31 * result + (downsample != null ? downsample.hashCode() : 0); + result = 31 * result + (tags != null ? tags.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "SubQuery{" + + "aggregator='" + aggregator + '\'' + + ", metric='" + metric + '\'' + + ", rate='" + rate + '\'' + + ", downsample='" + downsample + '\'' + + ", tags=" + tags + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java new file mode 100644 index 0000000..41730bd --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.client.services; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.openTSDB.client.OpenTSDB; +import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes; +import org.apache.drill.exec.store.openTSDB.client.Service; +import org.apache.drill.exec.store.openTSDB.client.query.DBQuery; +import org.apache.drill.exec.store.openTSDB.client.query.Query; +import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO; +import org.apache.drill.exec.store.openTSDB.dto.MetricDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import retrofit2.Retrofit; +import retrofit2.converter.jackson.JacksonConverterFactory; + +import java.io.IOException; +import java.util.ArrayList; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.drill.exec.store.openTSDB.Constants.AGGREGATOR_PARAM; +import static org.apache.drill.exec.store.openTSDB.Constants.DOWNSAMPLE_PARAM; +import static org.apache.drill.exec.store.openTSDB.Constants.END_TIME_PARAM; +import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM; +import static org.apache.drill.exec.store.openTSDB.Constants.TIME_PARAM; + +public class ServiceImpl implements Service { + + private static final Logger log = + LoggerFactory.getLogger(ServiceImpl.class); + + private final OpenTSDB client; + + public ServiceImpl(String connectionURL) { + this.client = new Retrofit.Builder() + .baseUrl(connectionURL) + .addConverterFactory(JacksonConverterFactory.create()) + .build() + .create(OpenTSDB.class); + } + + @Override + public Set<MetricDTO> getAllMetrics(Map<String, String> queryParams) { + return getAllMetricsByTags(queryParams); + } + + @Override + public Set<String> getAllMetricNames() { + return getTableNames(); + } + + @Override + public List<ColumnDTO> getUnfixedColumns(Map<String, String> queryParam) { + Set<MetricDTO> metrics = getAllMetricsByTags(queryParam); + List<ColumnDTO> unfixedColumns = new ArrayList<>(); + + for (MetricDTO metric : metrics) { + for (String tag : metric.getTags().keySet()) { + ColumnDTO tmp = new ColumnDTO(tag, OpenTSDBTypes.STRING); + if (!unfixedColumns.contains(tmp)) { + unfixedColumns.add(tmp); + } + } + } + return unfixedColumns; + } + + private Set<MetricDTO> getAllMetricsByTags(Map<String, String> queryParams) { + try { + return getAllMetricsFromDBByTags(queryParams); + } catch (IOException e) { + throw UserException.connectionError(e) + .message("Cannot connect to the db. " + + "Maybe you have incorrect connection params or db unavailable now") + .build(log); + } + } + + private Set<String> getTableNames() { + try { + return client.getAllTablesName().execute().body(); + } catch (IOException e) { + throw UserException.connectionError(e) + .message("Cannot connect to the db. " + + "Maybe you have incorrect connection params or db unavailable now") + .build(log); + } + } + + private Set<MetricDTO> getMetricsByTags(DBQuery base) throws IOException { + return client.getTables(base).execute().body(); + } + + private Set<MetricDTO> getAllMetricsFromDBByTags(Map<String, String> queryParams) throws IOException { + Map<String, String> tags = new HashMap<>(); + DBQuery baseQuery = getConfiguredDbQuery(tags, queryParams); + + Set<MetricDTO> metrics = getBaseMetric(baseQuery); + if (metrics == null || metrics.isEmpty()) { + throw UserException.validationError() + .message(String.format("Table '%s' not found. Please check your query and params", queryParams.get(METRIC_PARAM))) + .build(log); + } + Set<String> extractedTags = getTagsFromMetrics(metrics); + + return getMetricsByTags(extractedTags, queryParams); + } + + private Set<MetricDTO> getMetricsByTags(Set<String> extractedTags, Map<String, String> queryParams) throws IOException { + Set<MetricDTO> metrics = new HashSet<>(); + for (String value : extractedTags) { + metrics.addAll(getMetricsByTags(getConfiguredDbQuery(getTransformedTag(value), queryParams))); + } + return metrics; + } + + private DBQuery getConfiguredDbQuery(Map<String, String> tags, Map<String, String> queryParams) { + Query subQuery = new Query.Builder(queryParams.get(METRIC_PARAM)) + .setAggregator(queryParams.get(AGGREGATOR_PARAM)) + .setDownsample(queryParams.get(DOWNSAMPLE_PARAM)) + .setTags(tags).build(); + + Set<Query> queries = new HashSet<>(); + queries.add(subQuery); + + return new DBQuery.Builder() + .setStartTime(queryParams.get(TIME_PARAM)) + .setEndTime(queryParams.get(END_TIME_PARAM)) + .setQueries(queries) + .build(); + } + + private Set<MetricDTO> getBaseMetric(DBQuery base) throws IOException { + return getMetricsByTags(base); + } + + private Set<String> getTagsFromMetrics(Set<MetricDTO> metrics) { + Set<String> extractedTags = new HashSet<>(); + + for (MetricDTO table : metrics) { + extractedTags.addAll(table.getAggregateTags()); + extractedTags.addAll(table.getTags().keySet()); + } + + return extractedTags; + } + + private Map<String, String> getTransformedTag(String tag) { + Map<String, String> tags = new HashMap<>(); + tags.put(tag, "*"); + return tags; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java new file mode 100644 index 0000000..03c5952 --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.dto; + +import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes; + +import java.util.Objects; + +public class ColumnDTO { + + private final String columnName; + private final OpenTSDBTypes columnType; + + public ColumnDTO(String columnName, OpenTSDBTypes columnType) { + this.columnName = columnName; + this.columnType = columnType; + } + + public String getColumnName() { + return columnName; + } + + public OpenTSDBTypes getColumnType() { + return columnType; + } + + public boolean isNullable() { + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ColumnDTO columnDTO = (ColumnDTO) o; + return Objects.equals(columnName, columnDTO.columnName) && + columnType == columnDTO.columnType; + } + + @Override + public int hashCode() { + return Objects.hash(columnName, columnType); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java ---------------------------------------------------------------------- diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java new file mode 100644 index 0000000..7e6285f --- /dev/null +++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.openTSDB.dto; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class MetricDTO { + + private String metric; + private Map<String, String> tags; + private List<String> aggregateTags; + private Map<String, String> dps; + + public String getMetric() { + return metric; + } + + public Map<String, String> getTags() { + return tags; + } + + public List<String> getAggregateTags() { + return aggregateTags; + } + + public Map<String, String> getDps() { + return dps; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MetricDTO metricDTO = (MetricDTO) o; + return Objects.equals(metric, metricDTO.metric) && + Objects.equals(tags, metricDTO.tags) && + Objects.equals(aggregateTags, metricDTO.aggregateTags) && + Objects.equals(dps, metricDTO.dps); + } + + @Override + public int hashCode() { + return Objects.hash(metric, tags, aggregateTags, dps); + } + + @Override + public String toString() { + return "Table{" + + "metric='" + metric + '\'' + + ", tags=" + tags + + ", aggregateTags=" + aggregateTags + + ", dps=" + dps + + '}'; + } +}