Repository: hive Updated Branches: refs/heads/master 240da8403 -> e55ccd291
HIVE-19026: Add support for more ingestion formats - Druid Kafka Indexing (Nishant Bangarwa reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e55ccd29 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e55ccd29 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e55ccd29 Branch: refs/heads/master Commit: e55ccd29108f400c17f2d80d81278f81be128f9e Parents: 240da84 Author: Nishant <nishant.mon...@gmail.com> Authored: Wed Nov 14 18:13:09 2018 +0530 Committer: Nishant <nishant.mon...@gmail.com> Committed: Wed Nov 14 18:13:09 2018 +0530 ---------------------------------------------------------------------- data/scripts/kafka_init_data.csv | 10 + .../hadoop/hive/druid/conf/DruidConstants.java | 76 ++++++ .../hive/druid/json/AvroBytesDecoder.java | 37 +++ .../hadoop/hive/druid/json/AvroParseSpec.java | 104 ++++++++ .../druid/json/AvroStreamInputRowParser.java | 98 +++++++ .../json/InlineSchemaAvroBytesDecoder.java | 52 ++++ .../clientpositive/druidkafkamini_avro.q | 99 +++++++ .../queries/clientpositive/druidkafkamini_csv.q | 37 +++ .../clientpositive/druidkafkamini_delimited.q | 38 +++ .../druid/druidkafkamini_avro.q.out | 263 +++++++++++++++++++ .../druid/druidkafkamini_csv.q.out | 138 ++++++++++ .../druid/druidkafkamini_delimited.q.out | 140 ++++++++++ 12 files changed, 1092 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/data/scripts/kafka_init_data.csv ---------------------------------------------------------------------- diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv new file mode 100644 index 0000000..5dc094e --- /dev/null +++ b/data/scripts/kafka_init_data.csv @@ -0,0 +1,10 @@ +"2013-08-31T01:02:33Z", "Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-08-31T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 +"2013-08-31T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 +"2013-08-31T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 +"2013-08-31T12:41:27Z","Coyote Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9 +"2013-09-01T01:02:33Z","Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-09-01T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 +"2013-09-01T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 +"2013-09-01T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 +"2013-09-01T12:41:27Z","Coyote Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java new file mode 100644 index 0000000..242f7be --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.conf; + +public class DruidConstants { + + public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; + + public static final String DRUID_ROLLUP = "druid.rollup"; + + public static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; + + public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; + + public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; + + public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; + + public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + + public static final String KAFKA_TOPIC = "kafka.topic"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; + + public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; + + /* Kafka Ingestion state - valid values - START/STOP/RESET */ + public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; + + //Druid storage timestamp column name + public static final String DEFAULT_TIMESTAMP_COLUMN = "__time"; + + public static final String DRUID_TIMESTAMP_FORMAT = "druid.timestamp.format"; + + // Used when the field name in ingested data via streaming ingestion does not match + // druid default timestamp column i.e `__time` + public static final String DRUID_TIMESTAMP_COLUMN = "druid.timestamp.column"; + + //Druid Json timestamp column name for GroupBy results + public static final String EVENT_TIMESTAMP_COLUMN = "timestamp"; + + // Druid ParseSpec Type - JSON/CSV/TSV/AVRO + public static final String DRUID_PARSE_SPEC_FORMAT = "druid.parseSpec.format"; + + public static final String AVRO_SCHEMA_LITERAL = "avro.schema.literal"; + + // value delimiter for druid columns + public static final String DRUID_PARSE_SPEC_DELIMITER = "druid.parseSpec.delimiter"; + + // list demiliter for multi-valued columns + public static final String DRUID_PARSE_SPEC_LIST_DELIMITER = "druid.parseSpec.listDelimiter"; + + // order of columns for delimiter and csv parse specs. + public static final String DRUID_PARSE_SPEC_COLUMNS = "druid.parseSpec.columns"; + + public static final String DRUID_PARSE_SPEC_SKIP_HEADER_ROWS = "druid.parseSpec.skipHeaderRows"; + + public static final String DRUID_PARSE_SPEC_HAS_HEADER_ROWS = "druid.parseSpec.hasHeaderRows"; +} http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java new file mode 100644 index 0000000..3a1dbf7 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.avro.generic.GenericRecord; + +import java.nio.ByteBuffer; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "schema_inline", value = InlineSchemaAvroBytesDecoder.class) +}) +public interface AvroBytesDecoder +{ +} http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java new file mode 100644 index 0000000..af71f9a --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.common.parsers.Parser; + +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroParseSpec extends ParseSpec +{ + + @JsonIgnore + private final JSONPathSpec flattenSpec; + + @JsonCreator + public AvroParseSpec( + @JsonProperty("timestampSpec") TimestampSpec timestampSpec, + @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, + @JsonProperty("flattenSpec") JSONPathSpec flattenSpec + ) + { + super( + timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, null), + dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, null, null) + ); + + this.flattenSpec = flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT; + } + + @JsonProperty + public JSONPathSpec getFlattenSpec() + { + return flattenSpec; + } + + @Override + public Parser<String, Object> makeParser() + { + // makeParser is only used by StringInputRowParser, which cannot parse avro anyway. + throw new UnsupportedOperationException("makeParser not supported"); + } + + @Override + public ParseSpec withTimestampSpec(TimestampSpec spec) + { + return new AvroParseSpec(spec, getDimensionsSpec(), flattenSpec); + } + + @Override + public ParseSpec withDimensionsSpec(DimensionsSpec spec) + { + return new AvroParseSpec(getTimestampSpec(), spec, flattenSpec); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final AvroParseSpec that = (AvroParseSpec) o; + return Objects.equals(flattenSpec, that.flattenSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), flattenSpec); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java new file mode 100644 index 0000000..d6e6624 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.ParseSpec; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroStreamInputRowParser implements ByteBufferInputRowParser +{ + private final ParseSpec parseSpec; + private final AvroBytesDecoder avroBytesDecoder; + + @JsonCreator + public AvroStreamInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder + ) + { + this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); + this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); + } + + @Override + public List<InputRow> parseBatch(ByteBuffer input) + { + throw new UnsupportedOperationException("This class is only used for JSON serde"); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { + return avroBytesDecoder; + } + + @Override + public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new AvroStreamInputRowParser( + parseSpec, + avroBytesDecoder + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o; + return Objects.equals(parseSpec, that.parseSpec) && + Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + } + + @Override + public int hashCode() + { + return Objects.hash(parseSpec, avroBytesDecoder); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java new file mode 100644 index 0000000..72d6cbb --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Map; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder +{ + private final Map<String, Object> schema; + + @JsonCreator + public InlineSchemaAvroBytesDecoder( + @JsonProperty("schema") Map<String, Object> schema + ) + { + Preconditions.checkArgument(schema != null, "schema must be provided"); + + this.schema = schema; + } + + @JsonProperty + public Map<String, Object> getSchema() + { + return schema; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/queries/clientpositive/druidkafkamini_avro.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_avro.q b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q new file mode 100644 index 0000000..183491c --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q @@ -0,0 +1,99 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ); + +ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_avro; +DESCRIBE EXTENDED druid_kafka_test_avro; + +Select count(*) FROM druid_kafka_test_avro; + +Select page FROM druid_kafka_test_avro; + +DROP TABLE druid_kafka_test_avro; http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/queries/clientpositive/druidkafkamini_csv.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_csv.q b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q new file mode 100644 index 0000000..34be462 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q @@ -0,0 +1,37 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ); + +ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_csv; +DESCRIBE EXTENDED druid_kafka_test_csv; + +Select count(*) FROM druid_kafka_test_csv; + +Select page FROM druid_kafka_test_csv; + +DROP TABLE druid_kafka_test_csv; http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q new file mode 100644 index 0000000..91e279d --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q @@ -0,0 +1,38 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ); + +ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_delimited; +DESCRIBE EXTENDED druid_kafka_test_delimited; + +Select count(*) FROM druid_kafka_test_delimited; + +Select page FROM druid_kafka_test_delimited; + +DROP TABLE druid_kafka_test_delimited; http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out new file mode 100644 index 0000000..d33dd4c --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out @@ -0,0 +1,263 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_avro +PREHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro +["default.druid_kafka_test_avro"] +PREHOOK: query: DESCRIBE druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_avro +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 +PREHOOK: query: Select page FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +page is 0 +page is 100 +page is 200 +page is 300 +page is 400 +page is 500 +page is 600 +page is 700 +page is 800 +page is 900 +page is 1000 +PREHOOK: query: DROP TABLE druid_kafka_test_avro +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: DROP TABLE druid_kafka_test_avro +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out new file mode 100644 index 0000000..2f5817a --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out @@ -0,0 +1,138 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_csv +PREHOOK: query: ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: default@druid_kafka_test_csv +["default.druid_kafka_test_csv"] +PREHOOK: query: DESCRIBE druid_kafka_test_csv +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: query: DESCRIBE druid_kafka_test_csv +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_csv +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +Gypsy Danger +Striker Eureka +Cherno Alpha +Crimson Typhoon +Coyote Tango +Gypsy Danger +Striker Eureka +Cherno Alpha +Crimson Typhoon +Coyote Tango +PREHOOK: query: DROP TABLE druid_kafka_test_csv +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: DROP TABLE druid_kafka_test_csv +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: default@druid_kafka_test_csv http://git-wip-us.apache.org/repos/asf/hive/blob/e55ccd29/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out new file mode 100644 index 0000000..f6a417b --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out @@ -0,0 +1,140 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_delimited +PREHOOK: query: ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: default@druid_kafka_test_delimited +["default.druid_kafka_test_delimited"] +PREHOOK: query: DESCRIBE druid_kafka_test_delimited +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: query: DESCRIBE druid_kafka_test_delimited +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_delimited +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_delimited +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_delimited +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test_delimited +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_delimited +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: hdfs://### HDFS PATH ### + "Gypsy Danger" +"Striker Eureka" +"Cherno Alpha" +"Crimson Typhoon" +"Coyote Tango" +"Gypsy Danger" +"Striker Eureka" +"Cherno Alpha" +"Crimson Typhoon" +"Coyote Tango" +PREHOOK: query: DROP TABLE druid_kafka_test_delimited +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: DROP TABLE druid_kafka_test_delimited +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: default@druid_kafka_test_delimited