Repository: hive
Updated Branches:
  refs/heads/master d2838990f -> d3fed078c


http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
----------------------------------------------------------------------
diff --git 
a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java 
b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
index 71259dc..8e79d4c 100644
--- 
a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
+++ 
b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
@@ -21,7 +21,6 @@ package org.apache.hive.druid;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,12 +50,13 @@ public class MiniDruidCluster extends AbstractService {
           );
 
   private static final Map<String, String> COMMON_DRUID_CONF = ImmutableMap.of(
-          "druid.metadata.storage.type", "derby"
+          "druid.metadata.storage.type", "derby",
+          "druid.storage.type", "hdfs",
+          "druid.processing.buffer.sizeBytes", "213870912",
+          "druid.processing.numThreads", "2"
   );
 
   private static final Map<String, String> COMMON_DRUID_HISTORICAL = 
ImmutableMap.of(
-          "druid.processing.buffer.sizeBytes", "213870912",
-          "druid.processing.numThreads", "2",
           "druid.server.maxSize", "130000000000"
   );
 
@@ -87,26 +87,13 @@ public class MiniDruidCluster extends AbstractService {
   }
 
 
-  public MiniDruidCluster(String name, String logDir, String dataDir, Integer 
zookeeperPort, String classpath) {
+  public MiniDruidCluster(String name, String logDir, String tmpDir, Integer 
zookeeperPort, String classpath) {
     super(name);
-    this.dataDirectory = new File(dataDir, "druid-data");
+    this.dataDirectory = new File(tmpDir, "druid-data");
     this.logDirectory = new File(logDir);
-    try {
 
-      if (dataDirectory.exists()) {
-        // need to clean data directory to ensure that there is no 
interference from old runs
-        // Cleaning is happening here to allow debugging in case of tests fail
-        // we don;t have to clean logs since it is an append mode
-        log.info("Cleaning the druid-data directory [{}]", 
dataDirectory.getAbsolutePath());
-        FileUtils.deleteDirectory(dataDirectory);
-      } else {
-        log.info("Creating the druid-data directory [{}]", 
dataDirectory.getAbsolutePath());
-        dataDirectory.mkdirs();
-      }
-    } catch (IOException e) {
-      log.error("Failed to clean data directory");
-      Throwables.propagate(e);
-    }
+    ensureCleanDirectory(dataDirectory);
+
     String derbyURI = String
             
.format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db;create=true",
                     dataDirectory.getAbsolutePath()
@@ -126,13 +113,14 @@ public class MiniDruidCluster extends AbstractService {
             .put("druid.indexer.logs.directory", indexingLogDir)
             .put("druid.zk.service.host", "localhost:" + zookeeperPort)
             .put("druid.coordinator.startDelay", "PT1S")
+            .put("druid.indexer.runner", "local")
+            .put("druid.storage.storageDirectory", getDeepStorageDir())
             .build();
     Map<String, String> historicalProperties = 
historicalMapBuilder.putAll(COMMON_DRUID_CONF)
             .putAll(COMMON_DRUID_HISTORICAL)
             .put("druid.zk.service.host", "localhost:" + zookeeperPort)
             .put("druid.segmentCache.locations", segmentsCache)
             .put("druid.storage.storageDirectory", getDeepStorageDir())
-            .put("druid.storage.type", "hdfs")
             .build();
     coordinator = new ForkingDruidNode("coordinator", classpath, 
coordinatorProperties,
             COORDINATOR_JVM_CONF,
@@ -148,6 +136,24 @@ public class MiniDruidCluster extends AbstractService {
 
   }
 
+  private static void ensureCleanDirectory(File dir){
+    try {
+      if (dir.exists()) {
+        // need to clean data directory to ensure that there is no 
interference from old runs
+        // Cleaning is happening here to allow debugging in case of tests fail
+        // we don;t have to clean logs since it is an append mode
+        log.info("Cleaning the druid directory [{}]", dir.getAbsolutePath());
+        FileUtils.deleteDirectory(dir);
+      } else {
+        log.info("Creating the druid directory [{}]", dir.getAbsolutePath());
+        dir.mkdirs();
+      }
+    } catch (IOException e) {
+      log.error("Failed to clean druid directory");
+      Throwables.propagate(e);
+    }
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     druidNodes.stream().forEach(node -> {
@@ -191,4 +197,13 @@ public class MiniDruidCluster extends AbstractService {
   public String getDeepStorageDir() {
     return dataDirectory.getAbsolutePath() + File.separator + "deep-storage";
   }
+
+  public String getCoordinatorURI(){
+    return "localhost:8081";
+  }
+
+  public String getOverlordURI(){
+    // Overlord and coordinator both run in same JVM.
+    return getCoordinatorURI();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
 
b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
new file mode 100644
index 0000000..d839fd2
--- /dev/null
+++ 
b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
@@ -0,0 +1,122 @@
+package org.apache.hive.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.Files;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This class has the hooks to start and stop single node kafka cluster.
+ * The kafka broker is started on port 9092
+ */
+public class SingleNodeKafkaCluster extends AbstractService {
+  private static final Logger log = 
LoggerFactory.getLogger(SingleNodeKafkaCluster.class);
+
+  private final KafkaServerStartable serverStartable;
+  private final String zkString;
+
+  public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort){
+    super(name);
+    Properties properties = new Properties();
+    this.zkString = String.format("localhost:%d", zkPort);
+    properties.setProperty("zookeeper.connect", zkString);
+    properties.setProperty("broker.id", String.valueOf(1));
+    properties.setProperty("host.name", "localhost");
+    properties.setProperty("port", Integer.toString(9092));
+    properties.setProperty("log.dir", logDir);
+    properties.setProperty("log.flush.interval.messages", String.valueOf(1));
+    properties.setProperty("offsets.topic.replication.factor", 
String.valueOf(1));
+    properties.setProperty("offsets.topic.num.partitions", String.valueOf(1));
+    properties.setProperty("transaction.state.log.replication.factor", 
String.valueOf(1));
+    properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
+    properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
+
+    this.serverStartable = new 
KafkaServerStartable(KafkaConfig.fromProps(properties));
+  }
+
+
+  @Override
+  protected void serviceStart() throws Exception {
+    serverStartable.startup();
+    log.info("Kafka Server Started");
+
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    log.info("Stopping Kafka Server");
+    serverStartable.shutdown();
+    log.info("Kafka Server Stopped");
+  }
+
+  /**
+   * Creates a topic and inserts data from the specified datafile.
+   * Each line in the datafile is sent to kafka as a single message.
+   * @param topicName
+   * @param datafile
+   */
+  public void createTopicWithData(String topicName, File datafile){
+    createTopic(topicName);
+    // set up kafka producer
+    Properties properties = new Properties();
+    properties.put("bootstrap.servers", "localhost:9092");
+    properties.put("acks", "1");
+    properties.put("retries", "3");
+
+    try(KafkaProducer<String, String> producer = new KafkaProducer<>(
+        properties,
+        new StringSerializer(),
+        new StringSerializer()
+    )){
+      List<String> events = Files.readLines(datafile, 
Charset.forName("UTF-8"));
+      for(String event : events){
+        producer.send(new ProducerRecord<String, String>(topicName, event));
+      }
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+
+  }
+
+  public void createTopic(String topic) {
+    int sessionTimeoutMs = 1000;
+    ZkClient zkClient = new ZkClient(
+        this.zkString, sessionTimeoutMs, sessionTimeoutMs,
+        ZKStringSerializer$.MODULE$
+    );
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkString, 
sessionTimeoutMs), false);
+    int numPartitions = 1;
+    int replicationFactor = 1;
+    Properties topicConfig = new Properties();
+    AdminUtils.createTopic(
+        zkUtils,
+        topic,
+        numPartitions,
+        replicationFactor,
+        topicConfig,
+        RackAwareMode.Disabled$.MODULE$
+    );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
----------------------------------------------------------------------
diff --git 
a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
 
b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
new file mode 100644
index 0000000..4768975
--- /dev/null
+++ 
b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli;
+
+import org.apache.hadoop.hive.cli.control.CliAdapter;
+import org.apache.hadoop.hive.cli.control.CliConfigs;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TestMiniDruidKafkaCliDriver {
+
+  static CliAdapter adapter = new 
CliConfigs.MiniDruidKafkaCliConfig().getCliAdapter();
+
+  @Parameters(name = "{0}")
+  public static List<Object[]> getParameters() throws Exception {
+    return adapter.getParameters();
+  }
+
+  @ClassRule
+  public static TestRule cliClassRule = adapter.buildClassRule();
+
+  @Rule
+  public TestRule cliTestRule = adapter.buildTestRule();
+
+  private String name;
+  private File qfile;
+
+  public TestMiniDruidKafkaCliDriver(String name, File qfile) {
+    this.name = name;
+    this.qfile = qfile;
+  }
+
+  @Test
+  public void testCliDriver() throws Exception {
+    adapter.runTest(name, qfile);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index ec25713..d2e077b 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1685,5 +1685,5 @@ druid.query.files=druidmini_test1.q,\
   druidmini_expressions.q,\
   druidmini_extractTime.q,\
   druidmini_test_alter.q,\
+  druidkafkamini_basic.q,\
   druidmini_floorTime.q
-

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git 
a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index 7034c38..1e65569 100644
--- 
a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ 
b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -187,6 +187,29 @@ public class CliConfigs {
     }
   }
 
+  public static class MiniDruidKafkaCliConfig extends AbstractCliConfig {
+    public MiniDruidKafkaCliConfig() {
+      super(CoreCliDriver.class);
+      try {
+        setQueryDir("ql/src/test/queries/clientpositive");
+
+        includesFrom(testConfigProps, "druid.kafka.query.files");
+
+        setResultsDir("ql/src/test/results/clientpositive/druid");
+        setLogDir("itests/qtest/target/tmp/log");
+
+        setInitScript("q_test_druid_init.sql");
+        setCleanupScript("q_test_cleanup_druid.sql");
+        setHiveConfDir("data/conf/llap");
+        setClusterType(MiniClusterType.druidKafka);
+        setMetastoreType(MetastoreType.sql);
+        setFsType(QTestUtil.FsType.hdfs);
+      } catch (Exception e) {
+        throw new RuntimeException("can't construct cliconfig", e);
+      }
+    }
+  }
+
   public static class MiniLlapLocalCliConfig extends AbstractCliConfig {
 
     public MiniLlapLocalCliConfig() {

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 050f9d5..3cdad28 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
+import junit.framework.TestSuite;
 
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.io.IOUtils;
@@ -87,12 +88,11 @@ import 
org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
 import org.apache.hadoop.hive.common.io.SortPrintStream;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hive.druid.MiniDruidCluster;
 import org.apache.hadoop.hive.llap.LlapItUtils;
 import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -123,20 +123,23 @@ import org.apache.hadoop.hive.ql.dataset.Dataset;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.StreamPrinter;
+import org.apache.hive.druid.MiniDruidCluster;
+import org.apache.hive.kafka.SingleNodeKafkaCluster;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.tools.ant.BuildException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 
-import junit.framework.TestSuite;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * QTestUtil.
@@ -203,6 +206,7 @@ public class QTestUtil {
   private final String cleanupScript;
 
   private MiniDruidCluster druidCluster;
+  private SingleNodeKafkaCluster kafkaCluster;
 
   public interface SuiteAddTestFunctor {
     public void addTestToSuite(TestSuite suite, Object setup, String tName);
@@ -402,6 +406,8 @@ public class QTestUtil {
       conf.set("hive.druid.storage.storageDirectory", 
druidDeepStorage.toUri().getPath());
       conf.set("hive.druid.metadata.db.type", "derby");
       conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI());
+      conf.set("hive.druid.coordinator.address.default", 
druidCluster.getCoordinatorURI());
+      conf.set("hive.druid.overlord.address.default", 
druidCluster.getOverlordURI());
       final Path scratchDir = fs
               .makeQualified(new Path(System.getProperty("test.tmp.dir"), 
"druidStagingDir"));
       fs.mkdirs(scratchDir);
@@ -501,7 +507,9 @@ public class QTestUtil {
     llap(CoreClusterType.TEZ, FsType.hdfs),
     llap_local(CoreClusterType.TEZ, FsType.local),
     none(CoreClusterType.MR, FsType.local),
-    druid(CoreClusterType.TEZ, FsType.hdfs);
+    druid(CoreClusterType.TEZ, FsType.hdfs),
+    druidKafka(CoreClusterType.TEZ, FsType.hdfs),
+    kafka(CoreClusterType.TEZ, FsType.hdfs);
 
 
     private final CoreClusterType coreClusterType;
@@ -537,8 +545,11 @@ public class QTestUtil {
       } else if (type.equals("llap_local")) {
         return llap_local;
       } else if (type.equals("druid")) {
-      return druid;
-      } else {
+        return druid;
+      } else if (type.equals("druid-kafka")) {
+        return druidKafka;
+      }
+      else {
         return none;
       }
     }
@@ -630,11 +641,7 @@ public class QTestUtil {
       ? new File(new File(dataDir).getAbsolutePath() + "/datasets")
       : new File(conf.get("test.data.set.files"));
 
-    // Use the current directory if it is not specified
-    String scriptsDir = conf.get("test.data.scripts");
-    if (scriptsDir == null) {
-      scriptsDir = new File(".").getAbsolutePath() + "/data/scripts";
-    }
+    String scriptsDir = getScriptsDir();
 
     this.initScript = scriptsDir + File.separator + initScript;
     this.cleanupScript = scriptsDir + File.separator + cleanupScript;
@@ -643,6 +650,14 @@ public class QTestUtil {
 
     init();
   }
+  private String getScriptsDir() {
+    // Use the current directory if it is not specified
+    String scriptsDir = conf.get("test.data.scripts");
+    if (scriptsDir == null) {
+      scriptsDir = new File(".").getAbsolutePath() + "/data/scripts";
+    }
+    return scriptsDir;
+  }
 
   private void setupFileSystem(HadoopShims shims) throws IOException {
 
@@ -678,7 +693,7 @@ public class QTestUtil {
 
     String uriString = fs.getUri().toString();
 
-    if (clusterType == MiniClusterType.druid) {
+    if (clusterType == MiniClusterType.druid || clusterType == 
MiniClusterType.druidKafka) {
       final String tempDir = System.getProperty("test.tmp.dir");
       druidCluster = new MiniDruidCluster("mini-druid",
           getLogDirectory(),
@@ -699,6 +714,19 @@ public class QTestUtil {
       druidCluster.start();
     }
 
+    if(clusterType == MiniClusterType.kafka || clusterType == 
MiniClusterType.druidKafka) {
+      kafkaCluster = new SingleNodeKafkaCluster("kafka",
+          getLogDirectory() + "/kafka-cluster",
+          setup.zkPort
+      );
+      kafkaCluster.init(conf);
+      kafkaCluster.start();
+      kafkaCluster.createTopicWithData(
+          "test-topic",
+          new File(getScriptsDir(), "kafka_init_data.json")
+      );
+    }
+
     if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) {
       if (confDir != null && !confDir.isEmpty()) {
         conf.addResource(new URL("file://" + new 
File(confDir).toURI().getPath()
@@ -735,6 +763,11 @@ public class QTestUtil {
       druidCluster.stop();
       druidCluster = null;
     }
+
+    if (kafkaCluster != null) {
+      kafkaCluster.stop();
+      kafkaCluster = null;
+    }
     setup.tearDown();
     if (sparkSession != null) {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 194a7d0..8fc9a75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@
     <datanucleus-rdbms.version>4.1.19</datanucleus-rdbms.version>
     <datanucleus-jdo.version>3.2.0-m3</datanucleus-jdo.version>
     <commons-cli.version>1.2</commons-cli.version>
-    <commons-codec.version>1.4</commons-codec.version>
+    <commons-codec.version>1.7</commons-codec.version>
     <commons-collections.version>3.2.2</commons-collections.version>
     <commons-compress.version>1.9</commons-compress.version>
     <commons-exec.version>1.1</commons-exec.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q 
b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
new file mode 100644
index 0000000..38662e3
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
@@ -0,0 +1,74 @@
+CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, 
language string, added int, deleted int)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "test-topic",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT1S"
+        );
+
+ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 
'START');
+
+!curl -ss http://localhost:8081/druid/indexer/v1/supervisor;
+
+-- Sleep for some time for ingestion tasks to ingest events
+!sleep 50;
+
+DESCRIBE druid_kafka_test;
+DESCRIBE EXTENDED druid_kafka_test;
+
+Select count(*) FROM druid_kafka_test;
+
+Select page FROM druid_kafka_test order by page;
+
+-- Reset kafka Ingestion, this would reset the offsets and since we are using 
useEarliestOffset,
+-- We will see records duplicated after successful reset.
+ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 
'RESET');
+
+-- Sleep for some time for ingestion tasks to ingest events
+!sleep 50;
+
+DESCRIBE druid_kafka_test;
+DESCRIBE EXTENDED druid_kafka_test;
+
+Select count(*) FROM druid_kafka_test;
+
+Select page FROM druid_kafka_test order by page;
+
+-- Join against other normal tables
+CREATE TABLE languages(shortname string, fullname string);
+
+INSERT INTO languages values
+("en", "english"),
+("ru", "russian");
+
+EXPLAIN EXTENDED
+SELECT a.fullname, b.`user`
+FROM
+(
+(SELECT fullname, shortname
+FROM languages) a
+JOIN
+(SELECT language, `user`
+FROM druid_kafka_test) b
+  ON a.shortname = b.language
+);
+
+SELECT a.fullname, b.`user`
+FROM
+(
+(SELECT fullname, shortname
+FROM languages) a
+JOIN
+(SELECT language, `user`
+FROM druid_kafka_test) b
+  ON a.shortname = b.language
+) order by b.`user`;
+
+DROP TABLE druid_kafka_test;
+DROP TABLE druid_table_1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out 
b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
new file mode 100644
index 0000000..73eab7b
--- /dev/null
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
@@ -0,0 +1,485 @@
+PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, 
`user` string, language string, added int, deleted int)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "test-topic",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT1S"
+        )
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@druid_kafka_test
+POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page 
string, `user` string, language string, added int, deleted int)
+        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+        TBLPROPERTIES (
+        "druid.segment.granularity" = "MONTH",
+        "druid.query.granularity" = "MINUTE",
+        "kafka.bootstrap.servers" = "localhost:9092",
+        "kafka.topic" = "test-topic",
+        "druid.kafka.ingestion.useEarliestOffset" = "true",
+        "druid.kafka.ingestion.maxRowsInMemory" = "5",
+        "druid.kafka.ingestion.startDelay" = "PT1S",
+        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.period" = "PT1S"
+        )
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@druid_kafka_test
+PREHOOK: query: ALTER TABLE druid_kafka_test SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Output: default@druid_kafka_test
+POSTHOOK: query: ALTER TABLE druid_kafka_test SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'START')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Output: default@druid_kafka_test
+["default.druid_kafka_test"]
+PREHOOK: query: DESCRIBE druid_kafka_test
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test
+POSTHOOK: query: DESCRIBE druid_kafka_test
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test
+POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+                
+#### A masked pattern was here ####
+PREHOOK: query: Select count(*) FROM druid_kafka_test
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select count(*) FROM druid_kafka_test
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+10
+PREHOOK: query: Select page FROM druid_kafka_test order by page
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select page FROM druid_kafka_test order by page
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+Cherno Alpha
+Cherno Alpha
+Coyote Tango
+Coyote Tango
+Crimson Typhoon
+Crimson Typhoon
+Gypsy Danger
+Gypsy Danger
+Striker Eureka
+Striker Eureka
+PREHOOK: query: ALTER TABLE druid_kafka_test SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'RESET')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Output: default@druid_kafka_test
+POSTHOOK: query: ALTER TABLE druid_kafka_test SET 
TBLPROPERTIES('druid.kafka.ingestion' = 'RESET')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Output: default@druid_kafka_test
+PREHOOK: query: DESCRIBE druid_kafka_test
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test
+POSTHOOK: query: DESCRIBE druid_kafka_test
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@druid_kafka_test
+POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@druid_kafka_test
+__time                 timestamp               from deserializer   
+page                   string                  from deserializer   
+user                   string                  from deserializer   
+language               string                  from deserializer   
+added                  int                     from deserializer   
+deleted                int                     from deserializer   
+                
+#### A masked pattern was here ####
+PREHOOK: query: Select count(*) FROM druid_kafka_test
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select count(*) FROM druid_kafka_test
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+10
+PREHOOK: query: Select page FROM druid_kafka_test order by page
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: Select page FROM druid_kafka_test order by page
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+Cherno Alpha
+Cherno Alpha
+Coyote Tango
+Coyote Tango
+Crimson Typhoon
+Crimson Typhoon
+Gypsy Danger
+Gypsy Danger
+Striker Eureka
+Striker Eureka
+PREHOOK: query: CREATE TABLE languages(shortname string, fullname string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@languages
+POSTHOOK: query: CREATE TABLE languages(shortname string, fullname string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@languages
+PREHOOK: query: INSERT INTO languages values
+("en", "english"),
+("ru", "russian")
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@languages
+POSTHOOK: query: INSERT INTO languages values
+("en", "english"),
+("ru", "russian")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@languages
+POSTHOOK: Lineage: languages.fullname SCRIPT []
+POSTHOOK: Lineage: languages.shortname SCRIPT []
+PREHOOK: query: EXPLAIN EXTENDED
+SELECT a.fullname, b.`user`
+FROM
+(
+(SELECT fullname, shortname
+FROM languages) a
+JOIN
+(SELECT language, `user`
+FROM druid_kafka_test) b
+  ON a.shortname = b.language
+)
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN EXTENDED
+SELECT a.fullname, b.`user`
+FROM
+(
+(SELECT fullname, shortname
+FROM languages) a
+JOIN
+(SELECT language, `user`
+FROM druid_kafka_test) b
+  ON a.shortname = b.language
+)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: languages
+                  Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  GatherStats: false
+                  Filter Operator
+                    isSamplingPred: false
+                    predicate: shortname is not null (type: boolean)
+                    Statistics: Num rows: 2 Data size: 354 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: fullname (type: string), shortname (type: 
string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 2 Data size: 354 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string)
+                        null sort order: a
+                        sort order: +
+                        Map-reduce partition columns: _col1 (type: string)
+                        Statistics: Num rows: 2 Data size: 354 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        tag: 0
+                        value expressions: _col0 (type: string)
+                        auto parallelism: true
+            Path -> Alias:
+              hdfs://### HDFS PATH ### [languages]
+            Path -> Partition:
+              hdfs://### HDFS PATH ### 
+                Partition
+                  base file name: languages
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  properties:
+                    COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"fullname":"true","shortname":"true"}}
+                    bucket_count -1
+                    column.name.delimiter ,
+                    columns shortname,fullname
+                    columns.comments 
+                    columns.types string:string
+#### A masked pattern was here ####
+                    location hdfs://### HDFS PATH ###
+                    name default.languages
+                    numFiles 1
+                    numRows 2
+                    rawDataSize 20
+                    serialization.ddl struct languages { string shortname, 
string fullname}
+                    serialization.format 1
+                    serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    totalSize 22
+#### A masked pattern was here ####
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    properties:
+                      COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"fullname":"true","shortname":"true"}}
+                      bucket_count -1
+                      column.name.delimiter ,
+                      columns shortname,fullname
+                      columns.comments 
+                      columns.types string:string
+#### A masked pattern was here ####
+                      location hdfs://### HDFS PATH ###
+                      name default.languages
+                      numFiles 1
+                      numRows 2
+                      rawDataSize 20
+                      serialization.ddl struct languages { string shortname, 
string fullname}
+                      serialization.format 1
+                      serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      totalSize 22
+#### A masked pattern was here ####
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    name: default.languages
+                  name: default.languages
+            Truncated Path -> Alias:
+              /languages [languages]
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: druid_kafka_test
+                  properties:
+                    druid.fieldNames language,user
+                    druid.fieldTypes string,string
+                    druid.query.json 
{"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}
+                    druid.query.type scan
+                  Statistics: Num rows: 1 Data size: 368 Basic stats: COMPLETE 
Column stats: NONE
+                  GatherStats: false
+                  Reduce Output Operator
+                    key expressions: language (type: string)
+                    null sort order: a
+                    sort order: +
+                    Map-reduce partition columns: language (type: string)
+                    Statistics: Num rows: 1 Data size: 368 Basic stats: 
COMPLETE Column stats: NONE
+                    tag: 1
+                    value expressions: user (type: string)
+                    auto parallelism: true
+            Path -> Alias:
+              hdfs://### HDFS PATH ### [druid_kafka_test]
+            Path -> Partition:
+              hdfs://### HDFS PATH ### 
+                Partition
+                  base file name: druid_kafka_test
+                  input format: 
org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+                  output format: 
org.apache.hadoop.hive.druid.io.DruidOutputFormat
+                  properties:
+                    COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"__time":"true","added":"true","deleted":"true","language":"true","page":"true","user":"true"}}
+                    bucket_count -1
+                    column.name.delimiter ,
+                    columns __time,page,user,language,added,deleted
+                    columns.comments 
+                    columns.types timestamp:string:string:string:int:int
+                    druid.datasource default.druid_kafka_test
+                    druid.fieldNames language,user
+                    druid.fieldTypes string,string
+                    druid.kafka.ingestion.maxRowsInMemory 5
+                    druid.kafka.ingestion.period PT1S
+                    druid.kafka.ingestion.startDelay PT1S
+                    druid.kafka.ingestion.taskDuration PT30S
+                    druid.kafka.ingestion.useEarliestOffset true
+                    druid.query.granularity MINUTE
+                    druid.query.json 
{"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}
+                    druid.query.type scan
+                    druid.segment.granularity MONTH
+#### A masked pattern was here ####
+                    kafka.bootstrap.servers localhost:9092
+                    kafka.topic test-topic
+#### A masked pattern was here ####
+                    location hdfs://### HDFS PATH ###
+                    name default.druid_kafka_test
+                    numFiles 0
+                    numRows 0
+                    rawDataSize 0
+                    serialization.ddl struct druid_kafka_test { timestamp 
__time, string page, string user, string language, i32 added, i32 deleted}
+                    serialization.format 1
+                    serialization.lib 
org.apache.hadoop.hive.druid.serde.DruidSerDe
+                    storage_handler 
org.apache.hadoop.hive.druid.DruidStorageHandler
+                    totalSize 0
+#### A masked pattern was here ####
+                  serde: org.apache.hadoop.hive.druid.serde.DruidSerDe
+                
+                    input format: 
org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat
+                    output format: 
org.apache.hadoop.hive.druid.io.DruidOutputFormat
+                    properties:
+                      COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"__time":"true","added":"true","deleted":"true","language":"true","page":"true","user":"true"}}
+                      bucket_count -1
+                      column.name.delimiter ,
+                      columns __time,page,user,language,added,deleted
+                      columns.comments 
+                      columns.types timestamp:string:string:string:int:int
+                      druid.datasource default.druid_kafka_test
+                      druid.fieldNames language,user
+                      druid.fieldTypes string,string
+                      druid.kafka.ingestion.maxRowsInMemory 5
+                      druid.kafka.ingestion.period PT1S
+                      druid.kafka.ingestion.startDelay PT1S
+                      druid.kafka.ingestion.taskDuration PT30S
+                      druid.kafka.ingestion.useEarliestOffset true
+                      druid.query.granularity MINUTE
+                      druid.query.json 
{"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}
+                      druid.query.type scan
+                      druid.segment.granularity MONTH
+#### A masked pattern was here ####
+                      kafka.bootstrap.servers localhost:9092
+                      kafka.topic test-topic
+#### A masked pattern was here ####
+                      location hdfs://### HDFS PATH ###
+                      name default.druid_kafka_test
+                      numFiles 0
+                      numRows 0
+                      rawDataSize 0
+                      serialization.ddl struct druid_kafka_test { timestamp 
__time, string page, string user, string language, i32 added, i32 deleted}
+                      serialization.format 1
+                      serialization.lib 
org.apache.hadoop.hive.druid.serde.DruidSerDe
+                      storage_handler 
org.apache.hadoop.hive.druid.DruidStorageHandler
+                      totalSize 0
+#### A masked pattern was here ####
+                    serde: org.apache.hadoop.hive.druid.serde.DruidSerDe
+                    name: default.druid_kafka_test
+                  name: default.druid_kafka_test
+            Truncated Path -> Alias:
+              /druid_kafka_test [druid_kafka_test]
+        Reducer 2 
+            Needs Tagging: false
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col1 (type: string)
+                  1 language (type: string)
+                outputColumnNames: _col0, _col3
+                Position of Big Table: 0
+                Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE 
Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: string), _col3 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE 
Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    GlobalTableId: 0
+                    directory: hdfs://### HDFS PATH ###
+                    NumFilesPerFileSink: 1
+                    Statistics: Num rows: 2 Data size: 389 Basic stats: 
COMPLETE Column stats: NONE
+                    Stats Publishing Key Prefix: hdfs://### HDFS PATH ###
+                    table:
+                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        properties:
+                          columns _col0,_col1
+                          columns.types string:string
+                          escape.delim \
+                          hive.serialization.extend.additional.nesting.levels 
true
+                          serialization.escape.crlf true
+                          serialization.format 1
+                          serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    TotalFiles: 1
+                    GatherStats: false
+                    MultiFileSpray: false
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT a.fullname, b.`user`
+FROM
+(
+(SELECT fullname, shortname
+FROM languages) a
+JOIN
+(SELECT language, `user`
+FROM druid_kafka_test) b
+  ON a.shortname = b.language
+) order by b.`user`
+PREHOOK: type: QUERY
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Input: default@languages
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT a.fullname, b.`user`
+FROM
+(
+(SELECT fullname, shortname
+FROM languages) a
+JOIN
+(SELECT language, `user`
+FROM druid_kafka_test) b
+  ON a.shortname = b.language
+) order by b.`user`
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Input: default@languages
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+russian        masterYi
+russian        masterYi
+english        nuclear
+english        nuclear
+english        speed
+english        speed
+PREHOOK: query: DROP TABLE druid_kafka_test
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@druid_kafka_test
+PREHOOK: Output: default@druid_kafka_test
+POSTHOOK: query: DROP TABLE druid_kafka_test
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@druid_kafka_test
+POSTHOOK: Output: default@druid_kafka_test
+PREHOOK: query: DROP TABLE druid_table_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE druid_table_1
+POSTHOOK: type: DROPTABLE

Reply via email to