Repository: hive Updated Branches: refs/heads/master d2838990f -> d3fed078c
http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java ---------------------------------------------------------------------- diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index 71259dc..8e79d4c 100644 --- a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -21,7 +21,6 @@ package org.apache.hive.druid; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +50,13 @@ public class MiniDruidCluster extends AbstractService { ); private static final Map<String, String> COMMON_DRUID_CONF = ImmutableMap.of( - "druid.metadata.storage.type", "derby" + "druid.metadata.storage.type", "derby", + "druid.storage.type", "hdfs", + "druid.processing.buffer.sizeBytes", "213870912", + "druid.processing.numThreads", "2" ); private static final Map<String, String> COMMON_DRUID_HISTORICAL = ImmutableMap.of( - "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2", "druid.server.maxSize", "130000000000" ); @@ -87,26 +87,13 @@ public class MiniDruidCluster extends AbstractService { } - public MiniDruidCluster(String name, String logDir, String dataDir, Integer zookeeperPort, String classpath) { + public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zookeeperPort, String classpath) { super(name); - this.dataDirectory = new File(dataDir, "druid-data"); + this.dataDirectory = new File(tmpDir, "druid-data"); this.logDirectory = new File(logDir); - try { - if (dataDirectory.exists()) { - // need to clean data directory to ensure that there is no interference from old runs - // Cleaning is happening here to allow debugging in case of tests fail - // we don;t have to clean logs since it is an append mode - log.info("Cleaning the druid-data directory [{}]", dataDirectory.getAbsolutePath()); - FileUtils.deleteDirectory(dataDirectory); - } else { - log.info("Creating the druid-data directory [{}]", dataDirectory.getAbsolutePath()); - dataDirectory.mkdirs(); - } - } catch (IOException e) { - log.error("Failed to clean data directory"); - Throwables.propagate(e); - } + ensureCleanDirectory(dataDirectory); + String derbyURI = String .format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db;create=true", dataDirectory.getAbsolutePath() @@ -126,13 +113,14 @@ public class MiniDruidCluster extends AbstractService { .put("druid.indexer.logs.directory", indexingLogDir) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.coordinator.startDelay", "PT1S") + .put("druid.indexer.runner", "local") + .put("druid.storage.storageDirectory", getDeepStorageDir()) .build(); Map<String, String> historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_DRUID_HISTORICAL) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.segmentCache.locations", segmentsCache) .put("druid.storage.storageDirectory", getDeepStorageDir()) - .put("druid.storage.type", "hdfs") .build(); coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties, COORDINATOR_JVM_CONF, @@ -148,6 +136,24 @@ public class MiniDruidCluster extends AbstractService { } + private static void ensureCleanDirectory(File dir){ + try { + if (dir.exists()) { + // need to clean data directory to ensure that there is no interference from old runs + // Cleaning is happening here to allow debugging in case of tests fail + // we don;t have to clean logs since it is an append mode + log.info("Cleaning the druid directory [{}]", dir.getAbsolutePath()); + FileUtils.deleteDirectory(dir); + } else { + log.info("Creating the druid directory [{}]", dir.getAbsolutePath()); + dir.mkdirs(); + } + } catch (IOException e) { + log.error("Failed to clean druid directory"); + Throwables.propagate(e); + } + } + @Override protected void serviceStart() throws Exception { druidNodes.stream().forEach(node -> { @@ -191,4 +197,13 @@ public class MiniDruidCluster extends AbstractService { public String getDeepStorageDir() { return dataDirectory.getAbsolutePath() + File.separator + "deep-storage"; } + + public String getCoordinatorURI(){ + return "localhost:8081"; + } + + public String getOverlordURI(){ + // Overlord and coordinator both run in same JVM. + return getCoordinatorURI(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java ---------------------------------------------------------------------- diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java new file mode 100644 index 0000000..d839fd2 --- /dev/null +++ b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java @@ -0,0 +1,122 @@ +package org.apache.hive.kafka; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.apache.hadoop.service.AbstractService; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Throwables; +import com.google.common.io.Files; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Properties; + +/** + * This class has the hooks to start and stop single node kafka cluster. + * The kafka broker is started on port 9092 + */ +public class SingleNodeKafkaCluster extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(SingleNodeKafkaCluster.class); + + private final KafkaServerStartable serverStartable; + private final String zkString; + + public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort){ + super(name); + Properties properties = new Properties(); + this.zkString = String.format("localhost:%d", zkPort); + properties.setProperty("zookeeper.connect", zkString); + properties.setProperty("broker.id", String.valueOf(1)); + properties.setProperty("host.name", "localhost"); + properties.setProperty("port", Integer.toString(9092)); + properties.setProperty("log.dir", logDir); + properties.setProperty("log.flush.interval.messages", String.valueOf(1)); + properties.setProperty("offsets.topic.replication.factor", String.valueOf(1)); + properties.setProperty("offsets.topic.num.partitions", String.valueOf(1)); + properties.setProperty("transaction.state.log.replication.factor", String.valueOf(1)); + properties.setProperty("transaction.state.log.min.isr", String.valueOf(1)); + properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577"); + + this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties)); + } + + + @Override + protected void serviceStart() throws Exception { + serverStartable.startup(); + log.info("Kafka Server Started"); + + } + + @Override + protected void serviceStop() throws Exception { + log.info("Stopping Kafka Server"); + serverStartable.shutdown(); + log.info("Kafka Server Stopped"); + } + + /** + * Creates a topic and inserts data from the specified datafile. + * Each line in the datafile is sent to kafka as a single message. + * @param topicName + * @param datafile + */ + public void createTopicWithData(String topicName, File datafile){ + createTopic(topicName); + // set up kafka producer + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("acks", "1"); + properties.put("retries", "3"); + + try(KafkaProducer<String, String> producer = new KafkaProducer<>( + properties, + new StringSerializer(), + new StringSerializer() + )){ + List<String> events = Files.readLines(datafile, Charset.forName("UTF-8")); + for(String event : events){ + producer.send(new ProducerRecord<String, String>(topicName, event)); + } + } catch (IOException e) { + Throwables.propagate(e); + } + + } + + public void createTopic(String topic) { + int sessionTimeoutMs = 1000; + ZkClient zkClient = new ZkClient( + this.zkString, sessionTimeoutMs, sessionTimeoutMs, + ZKStringSerializer$.MODULE$ + ); + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkString, sessionTimeoutMs), false); + int numPartitions = 1; + int replicationFactor = 1; + Properties topicConfig = new Properties(); + AdminUtils.createTopic( + zkUtils, + topic, + numPartitions, + replicationFactor, + topicConfig, + RackAwareMode.Disabled$.MODULE$ + ); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java ---------------------------------------------------------------------- diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java new file mode 100644 index 0000000..4768975 --- /dev/null +++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniDruidKafkaCliDriver { + + static CliAdapter adapter = new CliConfigs.MiniDruidKafkaCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List<Object[]> getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniDruidKafkaCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index ec25713..d2e077b 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1685,5 +1685,5 @@ druid.query.files=druidmini_test1.q,\ druidmini_expressions.q,\ druidmini_extractTime.q,\ druidmini_test_alter.q,\ + druidkafkamini_basic.q,\ druidmini_floorTime.q - http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7034c38..1e65569 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -187,6 +187,29 @@ public class CliConfigs { } } + public static class MiniDruidKafkaCliConfig extends AbstractCliConfig { + public MiniDruidKafkaCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + + includesFrom(testConfigProps, "druid.kafka.query.files"); + + setResultsDir("ql/src/test/results/clientpositive/druid"); + setLogDir("itests/qtest/target/tmp/log"); + + setInitScript("q_test_druid_init.sql"); + setCleanupScript("q_test_cleanup_druid.sql"); + setHiveConfDir("data/conf/llap"); + setClusterType(MiniClusterType.druidKafka); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + public static class MiniLlapLocalCliConfig extends AbstractCliConfig { public MiniLlapLocalCliConfig() { http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 050f9d5..3cdad28 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import junit.framework.TestSuite; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.io.IOUtils; @@ -87,12 +88,11 @@ import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream; import org.apache.hadoop.hive.common.io.SortPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hive.druid.MiniDruidCluster; import org.apache.hadoop.hive.llap.LlapItUtils; import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; @@ -123,20 +123,23 @@ import org.apache.hadoop.hive.ql.dataset.Dataset; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.StreamPrinter; +import org.apache.hive.druid.MiniDruidCluster; +import org.apache.hive.kafka.SingleNodeKafkaCluster; import org.apache.logging.log4j.util.Strings; import org.apache.tools.ant.BuildException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import junit.framework.TestSuite; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * QTestUtil. @@ -203,6 +206,7 @@ public class QTestUtil { private final String cleanupScript; private MiniDruidCluster druidCluster; + private SingleNodeKafkaCluster kafkaCluster; public interface SuiteAddTestFunctor { public void addTestToSuite(TestSuite suite, Object setup, String tName); @@ -402,6 +406,8 @@ public class QTestUtil { conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); conf.set("hive.druid.metadata.db.type", "derby"); conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); + conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); + conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); final Path scratchDir = fs .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); @@ -501,7 +507,9 @@ public class QTestUtil { llap(CoreClusterType.TEZ, FsType.hdfs), llap_local(CoreClusterType.TEZ, FsType.local), none(CoreClusterType.MR, FsType.local), - druid(CoreClusterType.TEZ, FsType.hdfs); + druid(CoreClusterType.TEZ, FsType.hdfs), + druidKafka(CoreClusterType.TEZ, FsType.hdfs), + kafka(CoreClusterType.TEZ, FsType.hdfs); private final CoreClusterType coreClusterType; @@ -537,8 +545,11 @@ public class QTestUtil { } else if (type.equals("llap_local")) { return llap_local; } else if (type.equals("druid")) { - return druid; - } else { + return druid; + } else if (type.equals("druid-kafka")) { + return druidKafka; + } + else { return none; } } @@ -630,11 +641,7 @@ public class QTestUtil { ? new File(new File(dataDir).getAbsolutePath() + "/datasets") : new File(conf.get("test.data.set.files")); - // Use the current directory if it is not specified - String scriptsDir = conf.get("test.data.scripts"); - if (scriptsDir == null) { - scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; - } + String scriptsDir = getScriptsDir(); this.initScript = scriptsDir + File.separator + initScript; this.cleanupScript = scriptsDir + File.separator + cleanupScript; @@ -643,6 +650,14 @@ public class QTestUtil { init(); } + private String getScriptsDir() { + // Use the current directory if it is not specified + String scriptsDir = conf.get("test.data.scripts"); + if (scriptsDir == null) { + scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; + } + return scriptsDir; + } private void setupFileSystem(HadoopShims shims) throws IOException { @@ -678,7 +693,7 @@ public class QTestUtil { String uriString = fs.getUri().toString(); - if (clusterType == MiniClusterType.druid) { + if (clusterType == MiniClusterType.druid || clusterType == MiniClusterType.druidKafka) { final String tempDir = System.getProperty("test.tmp.dir"); druidCluster = new MiniDruidCluster("mini-druid", getLogDirectory(), @@ -699,6 +714,19 @@ public class QTestUtil { druidCluster.start(); } + if(clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { + kafkaCluster = new SingleNodeKafkaCluster("kafka", + getLogDirectory() + "/kafka-cluster", + setup.zkPort + ); + kafkaCluster.init(conf); + kafkaCluster.start(); + kafkaCluster.createTopicWithData( + "test-topic", + new File(getScriptsDir(), "kafka_init_data.json") + ); + } + if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { if (confDir != null && !confDir.isEmpty()) { conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() @@ -735,6 +763,11 @@ public class QTestUtil { druidCluster.stop(); druidCluster = null; } + + if (kafkaCluster != null) { + kafkaCluster.stop(); + kafkaCluster = null; + } setup.tearDown(); if (sparkSession != null) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 194a7d0..8fc9a75 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ <datanucleus-rdbms.version>4.1.19</datanucleus-rdbms.version> <datanucleus-jdo.version>3.2.0-m3</datanucleus-jdo.version> <commons-cli.version>1.2</commons-cli.version> - <commons-codec.version>1.4</commons-codec.version> + <commons-codec.version>1.7</commons-codec.version> <commons-collections.version>3.2.2</commons-collections.version> <commons-compress.version>1.9</commons-compress.version> <commons-exec.version>1.1</commons-exec.version> http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/ql/src/test/queries/clientpositive/druidkafkamini_basic.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q new file mode 100644 index 0000000..38662e3 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q @@ -0,0 +1,74 @@ +CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, language string, added int, deleted int) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT1S" + ); + +ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 50; + +DESCRIBE druid_kafka_test; +DESCRIBE EXTENDED druid_kafka_test; + +Select count(*) FROM druid_kafka_test; + +Select page FROM druid_kafka_test order by page; + +-- Reset kafka Ingestion, this would reset the offsets and since we are using useEarliestOffset, +-- We will see records duplicated after successful reset. +ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET'); + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 50; + +DESCRIBE druid_kafka_test; +DESCRIBE EXTENDED druid_kafka_test; + +Select count(*) FROM druid_kafka_test; + +Select page FROM druid_kafka_test order by page; + +-- Join against other normal tables +CREATE TABLE languages(shortname string, fullname string); + +INSERT INTO languages values +("en", "english"), +("ru", "russian"); + +EXPLAIN EXTENDED +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +); + +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) order by b.`user`; + +DROP TABLE druid_kafka_test; +DROP TABLE druid_table_1; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out new file mode 100644 index 0000000..73eab7b --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out @@ -0,0 +1,485 @@ +PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, language string, added int, deleted int) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT1S" + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, language string, added int, deleted int) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT1S" + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test +PREHOOK: query: ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: default@druid_kafka_test +["default.druid_kafka_test"] +PREHOOK: query: DESCRIBE druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer + +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test order by page +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test order by page +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +Cherno Alpha +Cherno Alpha +Coyote Tango +Coyote Tango +Crimson Typhoon +Crimson Typhoon +Gypsy Danger +Gypsy Danger +Striker Eureka +Striker Eureka +PREHOOK: query: ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: default@druid_kafka_test +PREHOOK: query: DESCRIBE druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer + +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test order by page +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test order by page +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +Cherno Alpha +Cherno Alpha +Coyote Tango +Coyote Tango +Crimson Typhoon +Crimson Typhoon +Gypsy Danger +Gypsy Danger +Striker Eureka +Striker Eureka +PREHOOK: query: CREATE TABLE languages(shortname string, fullname string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@languages +POSTHOOK: query: CREATE TABLE languages(shortname string, fullname string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@languages +PREHOOK: query: INSERT INTO languages values +("en", "english"), +("ru", "russian") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@languages +POSTHOOK: query: INSERT INTO languages values +("en", "english"), +("ru", "russian") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@languages +POSTHOOK: Lineage: languages.fullname SCRIPT [] +POSTHOOK: Lineage: languages.shortname SCRIPT [] +PREHOOK: query: EXPLAIN EXTENDED +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: languages + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + GatherStats: false + Filter Operator + isSamplingPred: false + predicate: shortname is not null (type: boolean) + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: fullname (type: string), shortname (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col1 (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + tag: 0 + value expressions: _col0 (type: string) + auto parallelism: true + Path -> Alias: + hdfs://### HDFS PATH ### [languages] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: languages + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"fullname":"true","shortname":"true"}} + bucket_count -1 + column.name.delimiter , + columns shortname,fullname + columns.comments + columns.types string:string +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.languages + numFiles 1 + numRows 2 + rawDataSize 20 + serialization.ddl struct languages { string shortname, string fullname} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 22 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"fullname":"true","shortname":"true"}} + bucket_count -1 + column.name.delimiter , + columns shortname,fullname + columns.comments + columns.types string:string +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.languages + numFiles 1 + numRows 2 + rawDataSize 20 + serialization.ddl struct languages { string shortname, string fullname} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 22 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.languages + name: default.languages + Truncated Path -> Alias: + /languages [languages] + Map 3 + Map Operator Tree: + TableScan + alias: druid_kafka_test + properties: + druid.fieldNames language,user + druid.fieldTypes string,string + druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} + druid.query.type scan + Statistics: Num rows: 1 Data size: 368 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Reduce Output Operator + key expressions: language (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: language (type: string) + Statistics: Num rows: 1 Data size: 368 Basic stats: COMPLETE Column stats: NONE + tag: 1 + value expressions: user (type: string) + auto parallelism: true + Path -> Alias: + hdfs://### HDFS PATH ### [druid_kafka_test] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: druid_kafka_test + input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat + output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"__time":"true","added":"true","deleted":"true","language":"true","page":"true","user":"true"}} + bucket_count -1 + column.name.delimiter , + columns __time,page,user,language,added,deleted + columns.comments + columns.types timestamp:string:string:string:int:int + druid.datasource default.druid_kafka_test + druid.fieldNames language,user + druid.fieldTypes string,string + druid.kafka.ingestion.maxRowsInMemory 5 + druid.kafka.ingestion.period PT1S + druid.kafka.ingestion.startDelay PT1S + druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.useEarliestOffset true + druid.query.granularity MINUTE + druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} + druid.query.type scan + druid.segment.granularity MONTH +#### A masked pattern was here #### + kafka.bootstrap.servers localhost:9092 + kafka.topic test-topic +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.druid_kafka_test + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct druid_kafka_test { timestamp __time, string page, string user, string language, i32 added, i32 deleted} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.druid.serde.DruidSerDe + storage_handler org.apache.hadoop.hive.druid.DruidStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.druid.serde.DruidSerDe + + input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat + output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"__time":"true","added":"true","deleted":"true","language":"true","page":"true","user":"true"}} + bucket_count -1 + column.name.delimiter , + columns __time,page,user,language,added,deleted + columns.comments + columns.types timestamp:string:string:string:int:int + druid.datasource default.druid_kafka_test + druid.fieldNames language,user + druid.fieldTypes string,string + druid.kafka.ingestion.maxRowsInMemory 5 + druid.kafka.ingestion.period PT1S + druid.kafka.ingestion.startDelay PT1S + druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.useEarliestOffset true + druid.query.granularity MINUTE + druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} + druid.query.type scan + druid.segment.granularity MONTH +#### A masked pattern was here #### + kafka.bootstrap.servers localhost:9092 + kafka.topic test-topic +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.druid_kafka_test + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct druid_kafka_test { timestamp __time, string page, string user, string language, i32 added, i32 deleted} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.druid.serde.DruidSerDe + storage_handler org.apache.hadoop.hive.druid.DruidStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.druid.serde.DruidSerDe + name: default.druid_kafka_test + name: default.druid_kafka_test + Truncated Path -> Alias: + /druid_kafka_test [druid_kafka_test] + Reducer 2 + Needs Tagging: false + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 language (type: string) + outputColumnNames: _col0, _col3 + Position of Big Table: 0 + Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 + directory: hdfs://### HDFS PATH ### + NumFilesPerFileSink: 1 + Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE Column stats: NONE + Stats Publishing Key Prefix: hdfs://### HDFS PATH ### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types string:string + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) order by b.`user` +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Input: default@languages +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) order by b.`user` +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Input: default@languages +POSTHOOK: Output: hdfs://### HDFS PATH ### +russian masterYi +russian masterYi +english nuclear +english nuclear +english speed +english speed +PREHOOK: query: DROP TABLE druid_kafka_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: DROP TABLE druid_kafka_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: default@druid_kafka_test +PREHOOK: query: DROP TABLE druid_table_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE druid_table_1 +POSTHOOK: type: DROPTABLE