Repository: hive
Updated Branches:
  refs/heads/master 116d2393f -> c7d5606b5


HIVE-20481: Add the Kafka Key record as part of the row (Slim Bouguerra, 
reviewed by Vineet Garg)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c7d5606b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c7d5606b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c7d5606b

Branch: refs/heads/master
Commit: c7d5606b53f0570b1101c08930da627331ac89fe
Parents: 116d239
Author: Slim Bouguerra <[email protected]>
Authored: Mon Sep 10 11:20:59 2018 -0700
Committer: Vineet Garg <[email protected]>
Committed: Mon Sep 10 11:22:30 2018 -0700

----------------------------------------------------------------------
 .../hive/kafka/SingleNodeKafkaCluster.java      |   7 +-
 .../hadoop/hive/kafka/GenericKafkaSerDe.java    |  44 +-----
 .../hadoop/hive/kafka/KafkaJsonSerDe.java       |  19 +--
 .../hive/kafka/KafkaPullerRecordReader.java     |   8 +-
 .../hadoop/hive/kafka/KafkaRecordWritable.java  |  53 ++++++--
 .../hadoop/hive/kafka/KafkaScanTrimmer.java     |  13 +-
 .../hadoop/hive/kafka/KafkaStorageHandler.java  |  73 +++++++---
 .../hive/kafka/KafkaStorageHandlerInfo.java     |  71 ++++++++++
 .../hadoop/hive/kafka/KafkaStreamingUtils.java  | 136 +++++++++++++++----
 .../hive/kafka/KafkaPullerInputSplitTest.java   |   4 +-
 .../hive/kafka/KafkaRecordIteratorTest.java     |  38 +++---
 .../hive/kafka/KafkaRecordWritableTest.java     |  34 ++++-
 .../hadoop/hive/kafka/KafkaScanTrimmerTest.java |  22 +--
 .../hive/kafka/KafkaStreamingUtilsTest.java     |  55 +++++++-
 .../hive/ql/metadata/StorageHandlerInfo.java    |   5 -
 .../clientpositive/kafka_storage_handler.q      |  20 ++-
 .../druid/kafka_storage_handler.q.out           | 124 ++++++++++++-----
 .../ptest2/conf/deployed/master-mr2.properties  |   2 +-
 18 files changed, 534 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
 
b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
index c9339b5..3f2c9a7 100644
--- 
a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
+++ 
b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java
@@ -10,6 +10,7 @@ import kafka.utils.ZkUtils;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
@@ -26,6 +27,10 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 
 /**
@@ -100,7 +105,7 @@ public class SingleNodeKafkaCluster extends AbstractService 
{
     )){
       List<String> events = Files.readLines(datafile, 
Charset.forName("UTF-8"));
       for(String event : events){
-        producer.send(new ProducerRecord<>(topicName, event));
+        producer.send(new ProducerRecord<>(topicName, "key", event));
       }
     } catch (IOException e) {
       Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
index e7ea53f..a0c79b3 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.kafka;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -40,11 +39,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
@@ -56,6 +51,7 @@ import java.rmi.server.UID;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -64,21 +60,6 @@ import java.util.stream.Collectors;
  */
 public class GenericKafkaSerDe extends AbstractSerDe {
   private static final Logger LOG = 
LoggerFactory.getLogger(GenericKafkaSerDe.class);
-  // ORDER of fields and types matters here
-  private static final ImmutableList<String>
-      METADATA_COLUMN_NAMES =
-      ImmutableList.of(KafkaStreamingUtils.PARTITION_COLUMN,
-          KafkaStreamingUtils.OFFSET_COLUMN,
-          KafkaStreamingUtils.TIMESTAMP_COLUMN,
-          KafkaStreamingUtils.START_OFFSET_COLUMN,
-          KafkaStreamingUtils.END_OFFSET_COLUMN);
-  private static final ImmutableList<PrimitiveTypeInfo>
-      METADATA_PRIMITIVE_TYPE_INFO =
-      ImmutableList.of(TypeInfoFactory.intTypeInfo,
-          TypeInfoFactory.longTypeInfo,
-          TypeInfoFactory.longTypeInfo,
-          TypeInfoFactory.longTypeInfo,
-          TypeInfoFactory.longTypeInfo);
 
   private AbstractSerDe delegateSerDe;
   private ObjectInspector objectInspector;
@@ -106,16 +87,14 @@ public class GenericKafkaSerDe extends AbstractSerDe {
         .stream()
         .map(StructField::getFieldName)
         .collect(Collectors.toList()));
-    columnNames.addAll(METADATA_COLUMN_NAMES);
+    columnNames.addAll(KafkaStreamingUtils.KAFKA_METADATA_COLUMN_NAMES);
 
     final List<ObjectInspector> inspectors = new 
ArrayList<>(columnNames.size());
     inspectors.addAll(delegateObjectInspector.getAllStructFieldRefs()
         .stream()
         .map(StructField::getFieldObjectInspector)
         .collect(Collectors.toList()));
-    inspectors.addAll(METADATA_PRIMITIVE_TYPE_INFO.stream()
-        .map(KafkaJsonSerDe.typeInfoToObjectInspector)
-        .collect(Collectors.toList()));
+    inspectors.addAll(KafkaStreamingUtils.KAFKA_METADATA_INSPECTORS);
     objectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, 
inspectors);
 
     // lazy supplier to read Avro Records if needed
@@ -159,20 +138,11 @@ public class GenericKafkaSerDe extends AbstractSerDe {
     }
 
     return columnNames.stream().map(name -> {
-      switch (name) {
-      case KafkaStreamingUtils.PARTITION_COLUMN:
-        return new IntWritable(record.getPartition());
-      case KafkaStreamingUtils.OFFSET_COLUMN:
-        return new LongWritable(record.getOffset());
-      case KafkaStreamingUtils.TIMESTAMP_COLUMN:
-        return new LongWritable(record.getTimestamp());
-      case KafkaStreamingUtils.START_OFFSET_COLUMN:
-        return new LongWritable(record.getStartOffset());
-      case KafkaStreamingUtils.END_OFFSET_COLUMN:
-        return new LongWritable(record.getEndOffset());
-      default:
-        return delegateObjectInspector.getStructFieldData(row, 
delegateObjectInspector.getStructFieldRef(name));
+      Function<KafkaRecordWritable, Writable> metaColumnMapper = 
KafkaStreamingUtils.recordWritableFnMap.get(name);
+      if (metaColumnMapper != null) {
+        return metaColumnMapper.apply(record);
       }
+      return delegateObjectInspector.getStructFieldData(row, 
delegateObjectInspector.getStructFieldRef(name));
     }).collect(Collectors.toList());
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
index f383190..5f0143d 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
@@ -80,11 +80,13 @@ import java.util.stream.Collectors;
  */
 public class KafkaJsonSerDe extends AbstractSerDe {
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaJsonSerDe.class);
-  private static final DateTimeFormatter TS_PARSER = createAutoParser();
-  static Function<TypeInfo, ObjectInspector>
-      typeInfoToObjectInspector = typeInfo ->
-      PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
-          TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()));
+  private static final ThreadLocal<DateTimeFormatter>
+      TS_PARSER =
+      ThreadLocal.withInitial(KafkaJsonSerDe::createAutoParser);
+  private static final Function<TypeInfo, ObjectInspector>
+      typeInfoToObjectInspector =
+      typeInfo -> 
PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
+          typeInfo.getTypeName()));
   private List<String> columnNames;
   private List<TypeInfo> columnTypes;
   private ObjectInspector inspector;
@@ -176,11 +178,11 @@ public class KafkaJsonSerDe extends AbstractSerDe {
     switch 
(TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()).getPrimitiveCategory())
 {
     case TIMESTAMP:
       TimestampWritable timestampWritable = new TimestampWritable();
-      timestampWritable.setTime(TS_PARSER.parseMillis(value.textValue()));
+      
timestampWritable.setTime(TS_PARSER.get().parseMillis(value.textValue()));
       return timestampWritable;
 
     case TIMESTAMPLOCALTZ:
-      final long numberOfMillis = TS_PARSER.parseMillis(value.textValue());
+      final long numberOfMillis = 
TS_PARSER.get().parseMillis(value.textValue());
       return new TimestampLocalTZWritable(new 
TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(numberOfMillis),
           ((TimestampLocalTZTypeInfo) typeInfo).timeZone())));
 
@@ -234,8 +236,7 @@ public class KafkaJsonSerDe extends AbstractSerDe {
     DateTimeParser
         timeOrOffset =
         new DateTimeFormatterBuilder().append(null,
-            new DateTimeParser[] {
-                new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
+            new DateTimeParser[] { new 
DateTimeFormatterBuilder().appendLiteral('T').toParser(),
                 new DateTimeFormatterBuilder().appendLiteral(' ').toParser() })
             .appendOptional(ISODateTimeFormat.timeElementParser().getParser())
             .appendOptional(offsetElement.getParser())

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
index 908ee5e..4f0ee94 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.kafka;
 
 import com.google.common.base.Preconditions;
-import com.google.common.io.Closer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -31,7 +30,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.Properties;
 
@@ -43,7 +41,6 @@ import java.util.Properties;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaPullerRecordReader.class);
 
-  private final Closer closer = Closer.create();
   private KafkaConsumer<byte[], byte[]> consumer = null;
   private Configuration config = null;
   private KafkaRecordWritable currentWritableValue;
@@ -67,7 +64,6 @@ import java.util.Properties;
       Preconditions.checkNotNull(brokerString, "broker end point can not be 
null");
       LOG.info("Starting Consumer with Kafka broker string [{}]", 
brokerString);
       consumer = new KafkaConsumer<>(properties);
-      closer.register(consumer);
     }
   }
 
@@ -154,11 +150,11 @@ import java.util.Properties;
     return consumedRecords * 1.0f / totalNumberRecords;
   }
 
-  @Override public void close() throws IOException {
+  @Override public void close() {
     LOG.trace("total read bytes [{}]", readBytes);
     if (consumer != null) {
       consumer.wakeup();
+      consumer.close();
     }
-    closer.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
index c6924ea..1b00f85 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.kafka;
 import org.apache.hadoop.io.Writable;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import javax.annotation.Nullable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -31,7 +32,8 @@ import java.util.Objects;
  * Writable implementation of Kafka ConsumerRecord.
  * Serialized in the form
  * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) |
- * {@code startOffset} (long) | {@code endOffset} (long) | {@code 
value.size()} (int) | {@code value} (byte [])
+ * {@code startOffset} (long) | {@code endOffset} (long) | {@code 
value.size()} (int) |
+ * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte 
[])}
  */
 public class KafkaRecordWritable implements Writable {
 
@@ -60,25 +62,34 @@ public class KafkaRecordWritable implements Writable {
    */
   private byte[] value;
 
+  /**
+   * Record key content or null
+   */
+  private byte[] recordKey;
+
+
   void set(ConsumerRecord<byte[], byte[]> consumerRecord, long startOffset, 
long endOffset) {
     this.partition = consumerRecord.partition();
     this.timestamp = consumerRecord.timestamp();
     this.offset = consumerRecord.offset();
     this.value = consumerRecord.value();
+    this.recordKey = consumerRecord.key();
     this.startOffset = startOffset;
     this.endOffset = endOffset;
   }
 
    KafkaRecordWritable(int partition,
-      long offset,
-      long timestamp,
-      byte[] value,
-      long startOffset,
-      long endOffset) {
+       long offset,
+       long timestamp,
+       byte[] value,
+       long startOffset,
+       long endOffset,
+       @Nullable byte[] recordKey) {
     this.partition = partition;
     this.offset = offset;
     this.timestamp = timestamp;
     this.value = value;
+    this.recordKey = recordKey;
     this.startOffset = startOffset;
     this.endOffset = endOffset;
   }
@@ -94,6 +105,12 @@ public class KafkaRecordWritable implements Writable {
     dataOutput.writeLong(endOffset);
     dataOutput.writeInt(value.length);
     dataOutput.write(value);
+    if (recordKey != null) {
+      dataOutput.writeInt(recordKey.length);
+      dataOutput.write(recordKey);
+    } else {
+      dataOutput.writeInt(-1);
+    }
   }
 
   @Override public void readFields(DataInput dataInput) throws IOException {
@@ -102,13 +119,20 @@ public class KafkaRecordWritable implements Writable {
     offset = dataInput.readLong();
     startOffset = dataInput.readLong();
     endOffset = dataInput.readLong();
-    int size = dataInput.readInt();
-    if (size > 0) {
-      value = new byte[size];
+    int dataSize = dataInput.readInt();
+    if (dataSize > 0) {
+      value = new byte[dataSize];
       dataInput.readFully(value);
     } else {
       value = new byte[0];
     }
+    int keyArraySize = dataInput.readInt();
+    if (keyArraySize > -1) {
+      recordKey = new byte[keyArraySize];
+      dataInput.readFully(recordKey);
+    } else {
+      recordKey = null;
+    }
   }
 
   int getPartition() {
@@ -135,6 +159,11 @@ public class KafkaRecordWritable implements Writable {
     return endOffset;
   }
 
+  @Nullable
+  byte[] getRecordKey() {
+    return recordKey;
+  }
+
   @Override public boolean equals(Object o) {
     if (this == o) {
       return true;
@@ -148,12 +177,14 @@ public class KafkaRecordWritable implements Writable {
         && startOffset == writable.startOffset
         && endOffset == writable.endOffset
         && timestamp == writable.timestamp
-        && Arrays.equals(value, writable.value);
+        && Arrays.equals(value, writable.value)
+        && Arrays.equals(recordKey, writable.recordKey);
   }
 
   @Override public int hashCode() {
     int result = Objects.hash(partition, offset, startOffset, endOffset, 
timestamp);
     result = 31 * result + Arrays.hashCode(value);
+    result = 31 * result + Arrays.hashCode(recordKey);
     return result;
   }
 
@@ -171,6 +202,8 @@ public class KafkaRecordWritable implements Writable {
         + timestamp
         + ", value="
         + Arrays.toString(value)
+        + ", recordKey="
+        + Arrays.toString(recordKey)
         + '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
index 7641515..8fbdfda 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
@@ -89,7 +89,7 @@ class KafkaScanTrimmer {
     if (LOG.isDebugEnabled()) {
       if (optimizedScan != null) {
         LOG.debug("Optimized scan:");
-        optimizedScan.forEach((tp, input) -> LOG.info(
+        optimizedScan.forEach((tp, input) -> LOG.debug(
             "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset 
[{}]",
             tp.topic(),
             tp.partition(),
@@ -97,7 +97,7 @@ class KafkaScanTrimmer {
             input.getEndOffset()));
       } else {
         LOG.debug("No optimization thus using full scan ");
-        fullHouse.forEach((tp, input) -> LOG.info(
+        fullHouse.forEach((tp, input) -> LOG.debug(
             "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset 
[{}]",
             tp.topic(),
             tp.partition(),
@@ -193,7 +193,7 @@ class KafkaScanTrimmer {
     }
 
 
-    if (columnDesc.getColumn().equals(KafkaStreamingUtils.PARTITION_COLUMN)) {
+    if 
(columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.PARTITION.getName()))
 {
       return buildScanFromPartitionPredicate(fullHouse,
           operator,
           ((Number) constantDesc.getValue()).intValue(),
@@ -201,7 +201,7 @@ class KafkaScanTrimmer {
           negation);
 
     }
-    if (columnDesc.getColumn().equals(KafkaStreamingUtils.OFFSET_COLUMN)) {
+    if 
(columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.OFFSET.getName()))
 {
       return buildScanFromOffsetPredicate(fullHouse,
           operator,
           ((Number) constantDesc.getValue()).longValue(),
@@ -209,7 +209,7 @@ class KafkaScanTrimmer {
           negation);
     }
 
-    if (columnDesc.getColumn().equals(KafkaStreamingUtils.TIMESTAMP_COLUMN)) {
+    if 
(columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.TIMESTAMP.getName()))
 {
       long timestamp = ((Number) constantDesc.getValue()).longValue();
       //noinspection unchecked
       return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, 
negation, kafkaConsumer);
@@ -280,7 +280,8 @@ class KafkaScanTrimmer {
    *
    * @return optimized kafka scan
    */
-  @VisibleForTesting static Map<TopicPartition, KafkaPullerInputSplit> 
buildScanFromOffsetPredicate(Map<TopicPartition, KafkaPullerInputSplit> 
fullScan,
+  @VisibleForTesting static Map<TopicPartition, KafkaPullerInputSplit> 
buildScanFromOffsetPredicate(Map<TopicPartition,
+      KafkaPullerInputSplit> fullScan,
       PredicateLeaf.Operator operator,
       long offsetConst,
       boolean flip,

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index 5847df5..96222c9 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -18,11 +18,12 @@
 
 package org.apache.hadoop.hive.kafka;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import 
org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
 import 
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -31,12 +32,16 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * Hive Kafka storage handler to allow user querying Stream of tuples from a 
Kafka queue.
@@ -44,8 +49,9 @@ import java.util.Map;
 public class KafkaStorageHandler implements HiveStorageHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaStorageHandler.class);
+  private static final String KAFKA_STORAGE_HANDLER = 
"org.apache.hadoop.hive.kafka.KafkaStorageHandler";
 
-  Configuration configuration;
+  private Configuration configuration;
 
   @Override public Class<? extends InputFormat> getInputFormatClass() {
     return KafkaPullerInputFormat.class;
@@ -63,22 +69,26 @@ public class KafkaStorageHandler implements 
HiveStorageHandler {
     return null;
   }
 
-  @Override public HiveAuthorizationProvider getAuthorizationProvider() throws 
HiveException {
+  @Override public HiveAuthorizationProvider getAuthorizationProvider() {
     return new DefaultHiveAuthorizationProvider();
   }
 
   @Override public void configureInputJobProperties(TableDesc tableDesc, 
Map<String, String> jobProperties) {
-    jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_TOPIC,
-        
Preconditions.checkNotNull(tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC),
-            "kafka topic missing set table property->" + 
KafkaStreamingUtils.HIVE_KAFKA_TOPIC));
-    LOG.debug("Table properties: Kafka Topic {}", 
tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC));
-    jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS,
-        
Preconditions.checkNotNull(tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS),
-            "Broker address missing set table property->" + 
KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS));
-    LOG.debug("Table properties: Kafka broker {}", 
tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS));
+    String topic = 
tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, "");
+    if (topic.isEmpty()) {
+      throw new IllegalArgumentException("Kafka topic missing set table 
property->"
+          + KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
+    }
+    jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, topic);
+    String brokerString = 
tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS,
 "");
+    if (brokerString.isEmpty()) {
+      throw new IllegalArgumentException("Broker address missing set table 
property->"
+          + KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+    }
+    jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, 
brokerString);
+
     jobProperties.put(KafkaStreamingUtils.SERDE_CLASS_NAME,
         
tableDesc.getProperties().getProperty(KafkaStreamingUtils.SERDE_CLASS_NAME, 
KafkaJsonSerDe.class.getName()));
-
     LOG.debug("Table properties: SerDe class name {}", 
jobProperties.get(KafkaStreamingUtils.SERDE_CLASS_NAME));
 
     //set extra properties
@@ -90,7 +100,9 @@ public class KafkaStorageHandler implements 
HiveStorageHandler {
             .toLowerCase()
             .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX))
         .forEach(entry -> {
-          String key = 
entry.getKey().toString().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length()
 + 1);
+          String
+              key =
+              
entry.getKey().toString().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length()
 + 1);
           if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) {
             throw new IllegalArgumentException("Not suppose to set Kafka 
Property " + key);
           }
@@ -116,7 +128,7 @@ public class KafkaStorageHandler implements 
HiveStorageHandler {
   @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) 
{
     Map<String, String> properties = new HashMap<>();
     configureInputJobProperties(tableDesc, properties);
-    properties.forEach((key, value) -> jobConf.set(key, value));
+    properties.forEach(jobConf::set);
     try {
       KafkaStreamingUtils.copyDependencyJars(jobConf, 
KafkaStorageHandler.class);
     } catch (IOException e) {
@@ -133,6 +145,35 @@ public class KafkaStorageHandler implements 
HiveStorageHandler {
   }
 
   @Override public String toString() {
-    return "org.apache.hadoop.hive.kafka.KafkaStorageHandler";
+    return KAFKA_STORAGE_HANDLER;
+  }
+
+  @Override public StorageHandlerInfo getStorageHandlerInfo(Table table) 
throws MetaException {
+    String topic = 
table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
+    if (topic == null || topic.isEmpty()) {
+      throw new MetaException("topic is null or empty");
+    }
+    String brokers = 
table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+    if (brokers == null || brokers.isEmpty()) {
+      throw new MetaException("kafka brokers string is null or empty");
+    }
+    final Properties properties = new Properties();
+    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
brokers);
+    table.getParameters()
+        .entrySet()
+        .stream()
+        .filter(objectObjectEntry -> objectObjectEntry.getKey()
+            .toLowerCase()
+            .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX))
+        .forEach(entry -> {
+          String key = 
entry.getKey().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length()
 + 1);
+          if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) {
+            throw new IllegalArgumentException("Not suppose to set Kafka 
Property " + key);
+          }
+          properties.put(key, entry.getValue());
+        });
+    return new KafkaStorageHandlerInfo(topic, properties);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandlerInfo.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandlerInfo.java
 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandlerInfo.java
new file mode 100644
index 0000000..2c7a086
--- /dev/null
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandlerInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka Storage Handler info.
+ */
+class KafkaStorageHandlerInfo implements StorageHandlerInfo {
+  private final String topic;
+  private final Properties consumerProperties;
+
+  KafkaStorageHandlerInfo(String topic, Properties consumerProperties) {
+    this.topic = topic;
+    this.consumerProperties = consumerProperties;
+  }
+
+  @Override public String formatAsText() {
+
+    try (KafkaConsumer consumer = new KafkaConsumer(consumerProperties) {
+    }) {
+      //noinspection unchecked
+      List<PartitionInfo> partitionsInfo = consumer.partitionsFor(topic);
+      List<TopicPartition>
+          topicPartitions =
+          partitionsInfo.stream()
+              .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()))
+              .collect(Collectors.toList());
+      Map endOffsets = consumer.endOffsets(topicPartitions);
+      Map startOffsets = consumer.beginningOffsets(topicPartitions);
+
+      return partitionsInfo.stream()
+          .map(partitionInfo -> String.format("%s [start offset = [%s], end 
offset = [%s]]",
+              partitionInfo.toString(),
+              startOffsets.get(new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition())),
+              endOffsets.get(new TopicPartition(partitionInfo.topic(), 
partitionInfo.partition()))))
+          .collect(Collectors.joining("\n"));
+    } catch (Exception e) {
+      return String.format("ERROR fetching metadata for Topic [%s], Connection 
String [%s], Error [%s]",
+          topic,
+          
consumerProperties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+          e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
index d2d0ebc..4802c4e 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
@@ -19,11 +19,21 @@
 package org.apache.hadoop.hive.kafka;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -33,10 +43,12 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -45,7 +57,7 @@ import java.util.stream.Collectors;
 final class KafkaStreamingUtils {
 
   /**
-   * MANDATORY Table property indicating kafka topic backing the table
+   * MANDATORY Table property indicating kafka topic backing the table.
    */
   static final String HIVE_KAFKA_TOPIC = "kafka.topic";
   /**
@@ -58,47 +70,26 @@ final class KafkaStreamingUtils {
   static final String SERDE_CLASS_NAME = "kafka.serde.class";
   /**
    * Table property indicating poll/fetch timeout period in millis.
-   * FYI this is independent from internal Kafka consumer timeouts, defaults 
to {@DEFAULT_CONSUMER_POLL_TIMEOUT_MS}
+   * FYI this is independent from internal Kafka consumer timeouts, defaults 
to {@DEFAULT_CONSUMER_POLL_TIMEOUT_MS}.
    */
   static final String HIVE_KAFKA_POLL_TIMEOUT = "hive.kafka.poll.timeout.ms";
   /**
-   * default poll timeout for fetching metadata and record batch
+   * Default poll timeout for fetching metadata and record batch.
    */
   static final long DEFAULT_CONSUMER_POLL_TIMEOUT_MS = 5000L; // 5 seconds
   /**
-   * Record Timestamp column name, added as extra meta column of type long
-   */
-  static final String TIMESTAMP_COLUMN = "__timestamp";
-  /**
-   * Record Kafka Partition column name added as extra meta column of type int
-   */
-  static final String PARTITION_COLUMN = "__partition";
-  /**
-   * Record offset column name added as extra metadata column to row as long
-   */
-  static final String OFFSET_COLUMN = "__offset";
-
-  /**
-   * Start offset given by the input split, this will reflect the actual start 
of TP or start given by split pruner
-   */
-  static final String START_OFFSET_COLUMN = "__start_offset";
-
-  /**
-   * End offset given by input split at run time
-   */
-  static final String END_OFFSET_COLUMN = "__end_offset";
-  /**
    * Table property prefix used to inject kafka consumer properties, e.g 
"kafka.consumer.max.poll.records" = "5000"
    * this will lead to inject max.poll.records=5000 to the Kafka Consumer. NOT 
MANDATORY defaults to nothing
    */
   static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
-
   /**
-   * Set of Kafka properties that the user can not set via DDLs
+   * Set of Kafka properties that the user can not set via DDLs.
    */
   static final HashSet<String> FORBIDDEN_PROPERTIES =
       new HashSet<>(ImmutableList.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
   private KafkaStreamingUtils() {
   }
@@ -172,4 +163,93 @@ final class KafkaStreamingUtils {
     // we are not setting conf thus null is okay
     return ReflectionUtil.newInstance(clazz, null);
   }
+
+  /**
+   * Basic Enum class for all the metadata columns appended to the Kafka row 
by the deserializer.
+   */
+  enum MetadataColumn {
+    /**
+     * Record offset column name added as extra metadata column to row as long.
+     */
+    OFFSET("__offset", TypeInfoFactory.longTypeInfo),
+    /**
+     * Record Kafka Partition column name added as extra meta column of type 
int.
+     */
+    PARTITION("__partition", TypeInfoFactory.intTypeInfo),
+    /**
+     * Record Kafka key column name added as extra meta column of type binary 
blob.
+     */
+    KEY("__key", TypeInfoFactory.binaryTypeInfo),
+    /**
+     * Record Timestamp column name, added as extra meta column of type long.
+     */
+    TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo),
+    /**
+     * Start offset given by the input split, this will reflect the actual 
start of TP or start given by split pruner.
+     */
+    START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo),
+    /**
+     * End offset given by input split at run time.
+     */
+    END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo);
+
+    private final String name;
+    private final TypeInfo typeInfo;
+
+    MetadataColumn(String name, TypeInfo typeInfo) {
+      this.name = name;
+      this.typeInfo = typeInfo;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public AbstractPrimitiveWritableObjectInspector getObjectInspector() {
+      return 
PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
+          typeInfo.getTypeName()));
+    }
+  }
+
+  //Order at which column and types will be appended to the original row.
+  /**
+   * Kafka metadata columns order list.
+   */
+  private static final List<MetadataColumn> KAFKA_METADATA_COLUMNS =
+      Arrays.asList(MetadataColumn.KEY,
+          MetadataColumn.PARTITION,
+          MetadataColumn.OFFSET,
+          MetadataColumn.TIMESTAMP,
+          MetadataColumn.START_OFFSET,
+          MetadataColumn.END_OFFSET);
+
+  /**
+   * Kafka metadata column names.
+   */
+  static final List<String> KAFKA_METADATA_COLUMN_NAMES = 
KAFKA_METADATA_COLUMNS
+      .stream()
+      .map(MetadataColumn::getName)
+      .collect(Collectors.toList());
+
+  /**
+   * Kafka metadata column inspectors.
+   */
+  static final List<ObjectInspector> KAFKA_METADATA_INSPECTORS = 
KAFKA_METADATA_COLUMNS
+      .stream()
+      .map(MetadataColumn::getObjectInspector)
+      .collect(Collectors.toList());
+
+  /**
+   * Reverse lookup map used to convert records from kafka Writable to hive 
Writable based on Kafka semantic.
+   */
+  static final Map<String, Function<KafkaRecordWritable, Writable>>
+      recordWritableFnMap = ImmutableMap.<String, 
Function<KafkaRecordWritable, Writable>>builder()
+      .put(MetadataColumn.END_OFFSET.getName(), (record) -> new 
LongWritable(record.getEndOffset()))
+      .put(MetadataColumn.KEY.getName(),
+          record -> record.getRecordKey() == null ? null : new 
BytesWritable(record.getRecordKey()))
+      .put(MetadataColumn.OFFSET.getName(), record -> new 
LongWritable(record.getOffset()))
+      .put(MetadataColumn.PARTITION.getName(), record -> new 
IntWritable(record.getPartition()))
+      .put(MetadataColumn.START_OFFSET.getName(), record -> new 
LongWritable(record.getStartOffset()))
+      .put(MetadataColumn.TIMESTAMP.getName(), record -> new 
LongWritable(record.getTimestamp()))
+      .build();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
index be26986..00f95ca 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
@@ -33,11 +33,11 @@ import java.util.List;
  * Kafka Hadoop InputSplit Test.
  */
 public class KafkaPullerInputSplitTest {
-  private String topic = "my_topic";
   private KafkaPullerInputSplit expectedInputSplit;
 
   public KafkaPullerInputSplitTest() {
-    this.expectedInputSplit = new KafkaPullerInputSplit(this.topic, 1, 50L, 
56L, new Path("/tmp"));
+    String topic = "my_topic";
+    this.expectedInputSplit = new KafkaPullerInputSplit(topic, 1, 50L, 56L, 
new Path("/tmp"));
   }
 
   @Test public void testWriteRead() throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
index 5de51cd..98a5568 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
@@ -30,8 +30,6 @@ import kafka.utils.ZkUtils;
 import kafka.zk.EmbeddedZookeeper;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@@ -69,22 +67,21 @@ public class KafkaRecordIteratorTest {
   private static final int RECORD_NUMBER = 100;
   private static final String TOPIC = "my_test_topic";
   private static final TopicPartition TOPIC_PARTITION = new 
TopicPartition(TOPIC, 0);
-  public static final byte[] KEY_BYTES = 
"KEY".getBytes(Charset.forName("UTF-8"));
+  private static final byte[] KEY_BYTES = 
"KEY".getBytes(Charset.forName("UTF-8"));
   private static final List<ConsumerRecord<byte[], byte[]>>
       RECORDS =
       IntStream.range(0, RECORD_NUMBER).mapToObj(number -> {
         final byte[] value = ("VALUE-" + 
Integer.toString(number)).getBytes(Charset.forName("UTF-8"));
         return new ConsumerRecord<>(TOPIC, 0, (long) number, 0L, null, 0L, 0, 
0, KEY_BYTES, value);
       }).collect(Collectors.toList());
-  public static final long POLL_TIMEOUT_MS = 900L;
+  private static final long POLL_TIMEOUT_MS = 900L;
   private static ZkUtils zkUtils;
   private static ZkClient zkClient;
   private static KafkaProducer<byte[], byte[]> producer;
   private static KafkaServer kafkaServer;
-  private static String zkConnect;
   private KafkaConsumer<byte[], byte[]> consumer = null;
   private KafkaRecordIterator kafkaRecordIterator = null;
-  private Configuration conf = new Configuration();
+  private final Configuration conf = new Configuration();
   private static EmbeddedZookeeper zkServer;
 
   public KafkaRecordIteratorTest() {
@@ -93,7 +90,7 @@ public class KafkaRecordIteratorTest {
   @BeforeClass public static void setupCluster() throws IOException {
     LOG.info("init embedded Zookeeper");
     zkServer = new EmbeddedZookeeper();
-    zkConnect = "127.0.0.1:" + zkServer.port();
+    String zkConnect = "127.0.0.1:" + zkServer.port();
     zkClient = new ZkClient(zkConnect, 3000, 3000, 
ZKStringSerializer$.MODULE$);
     zkUtils = ZkUtils.apply(zkClient, false);
     LOG.info("init kafka broker");
@@ -174,12 +171,13 @@ public class KafkaRecordIteratorTest {
     List<KafkaRecordWritable>
         serRecords =
         RECORDS.stream()
-            .map((aRecord) -> new KafkaRecordWritable(aRecord.partition(),
-                aRecord.offset(),
-                aRecord.timestamp(),
-                aRecord.value(),
+            .map((consumerRecord) -> new 
KafkaRecordWritable(consumerRecord.partition(),
+                consumerRecord.offset(),
+                consumerRecord.timestamp(),
+                consumerRecord.value(),
                 50L,
-                100L))
+                100L,
+                consumerRecord.key()))
             .collect(Collectors.toList());
     KafkaPullerRecordReader recordReader = new KafkaPullerRecordReader();
     TaskAttemptContext context = new TaskAttemptContextImpl(this.conf, new 
TaskAttemptID());
@@ -225,8 +223,8 @@ public class KafkaRecordIteratorTest {
     this.kafkaRecordIterator =
         new KafkaRecordIterator(this.consumer,
             TOPIC_PARTITION,
-            new Long(RECORD_NUMBER),
-            new Long(RECORD_NUMBER),
+            (long) RECORD_NUMBER,
+            (long) RECORD_NUMBER,
             POLL_TIMEOUT_MS);
     this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator);
   }
@@ -238,11 +236,11 @@ public class KafkaRecordIteratorTest {
 
   private void compareIterator(List<ConsumerRecord<byte[], byte[]>> expected,
       Iterator<ConsumerRecord<byte[], byte[]>> kafkaRecordIterator) {
-    expected.stream().forEachOrdered((expectedRecord) -> {
+    expected.forEach((expectedRecord) -> {
       Assert.assertTrue("record with offset " + expectedRecord.offset(), 
kafkaRecordIterator.hasNext());
       ConsumerRecord record = kafkaRecordIterator.next();
-      Assert.assertTrue(record.topic().equals(TOPIC));
-      Assert.assertTrue(record.partition() == 0);
+      Assert.assertEquals(record.topic(), TOPIC);
+      Assert.assertEquals(0, record.partition());
       Assert.assertEquals("Offsets not matching", expectedRecord.offset(), 
record.offset());
       byte[] binaryExceptedValue = expectedRecord.value();
       byte[] binaryExceptedKey = expectedRecord.key();
@@ -261,7 +259,7 @@ public class KafkaRecordIteratorTest {
     producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
     producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
     producerProps.setProperty("max.block.ms", "10000");
-    producer = new KafkaProducer(producerProps);
+    producer = new KafkaProducer<>(producerProps);
     LOG.info("kafka producer started");
   }
 
@@ -277,13 +275,13 @@ public class KafkaRecordIteratorTest {
     consumerProps.setProperty("fetch.max.wait.ms", "3001");
     consumerProps.setProperty("session.timeout.ms", "3001");
     consumerProps.setProperty("metadata.max.age.ms", "100");
-    this.consumer = new KafkaConsumer(consumerProps);
+    this.consumer = new KafkaConsumer<>(consumerProps);
   }
 
   private static void sendData() {
     LOG.info("Sending {} records", RECORD_NUMBER);
     RECORDS.stream()
-        .map(consumerRecord -> new ProducerRecord(consumerRecord.topic(),
+        .map(consumerRecord -> new ProducerRecord<>(consumerRecord.topic(),
             consumerRecord.partition(),
             consumerRecord.timestamp(),
             consumerRecord.key(),

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
index 8f9df54..4fb9664 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
@@ -37,7 +37,39 @@ public class KafkaRecordWritableTest {
 
   @Test public void testWriteReadFields() throws IOException {
     ConsumerRecord<byte[], byte[]> record = new ConsumerRecord("topic", 0, 3L, 
"key".getBytes(), "value".getBytes());
-    KafkaRecordWritable kafkaRecordWritable = new 
KafkaRecordWritable(record.partition(), record.offset(), record.timestamp(), 
record.value(), 0L, 100L);
+    KafkaRecordWritable
+        kafkaRecordWritable =
+        new KafkaRecordWritable(record.partition(),
+            record.offset(),
+            record.timestamp(),
+            record.value(),
+            0L,
+            100L,
+            null);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream w = new DataOutputStream(baos);
+    kafkaRecordWritable.write(w);
+    w.flush();
+
+    ByteArrayInputStream input = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream inputStream = new DataInputStream(input);
+    KafkaRecordWritable actualKafkaRecordWritable = new KafkaRecordWritable();
+    actualKafkaRecordWritable.readFields(inputStream);
+    Assert.assertEquals(kafkaRecordWritable, actualKafkaRecordWritable);
+  }
+
+
+  @Test public void testWriteReadFields2() throws IOException {
+    ConsumerRecord<byte[], byte[]> record = new ConsumerRecord("topic", 0, 3L, 
"key".getBytes(), "value".getBytes());
+    KafkaRecordWritable
+        kafkaRecordWritable =
+        new KafkaRecordWritable(record.partition(),
+            record.offset(),
+            record.timestamp(),
+            record.value(),
+            0L,
+            100L,
+            "thiskey".getBytes());
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream w = new DataOutputStream(baos);
     kafkaRecordWritable.write(w);

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
index 289dafd..2a40bff 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
@@ -52,19 +52,25 @@ import static org.junit.Assert.assertNotNull;
 public class KafkaScanTrimmerTest {
   private static final Path PATH = new Path("/tmp");
 
-  private ExprNodeDesc zeroInt = ConstantExprBuilder.build(0);
-  private ExprNodeDesc threeInt = ConstantExprBuilder.build(3);
-  private ExprNodeDesc thirtyLong = ConstantExprBuilder.build(30L);
-  private ExprNodeDesc thirtyFiveLong = ConstantExprBuilder.build(35L);
-  private ExprNodeDesc seventyFiveLong = ConstantExprBuilder.build(75L);
-  private ExprNodeDesc fortyLong = ConstantExprBuilder.build(40L);
+  private final ExprNodeDesc zeroInt = ConstantExprBuilder.build(0);
+  private final ExprNodeDesc threeInt = ConstantExprBuilder.build(3);
+  private final ExprNodeDesc thirtyLong = ConstantExprBuilder.build(30L);
+  private final ExprNodeDesc thirtyFiveLong = ConstantExprBuilder.build(35L);
+  private final ExprNodeDesc seventyFiveLong = ConstantExprBuilder.build(75L);
+  private final ExprNodeDesc fortyLong = ConstantExprBuilder.build(40L);
 
   private ExprNodeDesc
       partitionColumn =
-      new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, 
KafkaStreamingUtils.PARTITION_COLUMN, null, false);
+      new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo,
+          KafkaStreamingUtils.MetadataColumn.PARTITION.getName(),
+          null,
+          false);
   private ExprNodeDesc
       offsetColumn =
-      new ExprNodeColumnDesc(TypeInfoFactory.longTypeInfo, 
KafkaStreamingUtils.OFFSET_COLUMN, null, false);
+      new ExprNodeColumnDesc(TypeInfoFactory.longTypeInfo,
+          KafkaStreamingUtils.MetadataColumn.OFFSET.getName(),
+          null,
+          false);
 
   private String topic = "my_topic";
   private Map<TopicPartition, KafkaPullerInputSplit>

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
index 8d68ec2..071df3f 100644
--- 
a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
+++ 
b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
@@ -19,11 +19,21 @@
 package org.apache.hadoop.hive.kafka;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.hive.kafka.KafkaStreamingUtils.*;
 
 /**
  * Test for Utility class.
@@ -37,7 +47,7 @@ public class KafkaStreamingUtilsTest {
     configuration.set("kafka.bootstrap.servers", "localhost:9090");
     configuration.set("kafka.consumer.fetch.max.wait.ms", "40");
     configuration.set("kafka.consumer.my.new.wait.ms", "400");
-    Properties properties = 
KafkaStreamingUtils.consumerProperties(configuration);
+    Properties properties = consumerProperties(configuration);
     Assert.assertEquals("localhost:9090", 
properties.getProperty("bootstrap.servers"));
     Assert.assertEquals("40", properties.getProperty("fetch.max.wait.ms"));
     Assert.assertEquals("400", properties.getProperty("my.new.wait.ms"));
@@ -47,13 +57,52 @@ public class KafkaStreamingUtilsTest {
     Configuration configuration = new Configuration();
     configuration.set("kafka.bootstrap.servers", "localhost:9090");
     configuration.set("kafka.consumer." + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-    KafkaStreamingUtils.consumerProperties(configuration);
+    consumerProperties(configuration);
   }
 
   @Test(expected = IllegalArgumentException.class) public void 
canNotSetForbiddenProp2() {
     Configuration configuration = new Configuration();
     configuration.set("kafka.bootstrap.servers", "localhost:9090");
     configuration.set("kafka.consumer." + 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "value");
-    KafkaStreamingUtils.consumerProperties(configuration);
+    consumerProperties(configuration);
+  }
+
+  @Test public void testMetadataEnumLookupMapper() {
+    int partition = 1;
+    long offset = 5L;
+    long ts = System.currentTimeMillis();
+    long startOffset = 0L;
+    long endOffset = 200L;
+    byte[] value = "value".getBytes();
+    byte[] key = "key".getBytes();
+    // ORDER MATTERS here.
+    List<Writable>
+        expectedWritables =
+        Arrays.asList(new BytesWritable(key),
+            new IntWritable(partition),
+            new LongWritable(offset),
+            new LongWritable(ts),
+            new LongWritable(startOffset),
+            new LongWritable(endOffset));
+    KafkaRecordWritable KRWritable = new KafkaRecordWritable(partition, 
offset, ts, value, startOffset, endOffset, key);
+
+    List<Writable>
+        actual =
+        KAFKA_METADATA_COLUMN_NAMES.stream()
+            .map(recordWritableFnMap::get)
+            .map(fn -> fn.apply(KRWritable))
+            .collect(Collectors.toList());
+
+    Assert.assertEquals(expectedWritables, actual);
+  }
+
+  @Test
+  public void testEnsureThatAllTheColumnAreListed() {
+    Assert.assertEquals(MetadataColumn.values().length, 
KafkaStreamingUtils.KAFKA_METADATA_COLUMN_NAMES.size());
+    Assert.assertEquals(MetadataColumn.values().length, 
KafkaStreamingUtils.KAFKA_METADATA_INSPECTORS.size());
+    Assert.assertFalse(Arrays.stream(MetadataColumn.values())
+        .map(MetadataColumn::getName)
+        .anyMatch(name -> !KAFKA_METADATA_COLUMN_NAMES.contains(name)));
+    Arrays.stream(MetadataColumn.values()).forEach(element -> 
Assert.assertNotNull(KafkaStreamingUtils.recordWritableFnMap.get(element.getName())));
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
index dbc44a6..e68b8ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
@@ -18,13 +18,8 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
 /**
  * StorageHandlerInfo is a marker interface used to provide runtime 
information associated with a storage handler.

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/kafka_storage_handler.q 
b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
index 8daa3e3..f10bb90 100644
--- a/ql/src/test/queries/clientpositive/kafka_storage_handler.q
+++ b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
@@ -14,7 +14,7 @@ TBLPROPERTIES
 
 DESCRIBE EXTENDED kafka_table;
 
-Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__time`, 
`page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, 
`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, 
`newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM 
kafka_table;
 
 Select count(*) FROM kafka_table;
@@ -31,11 +31,11 @@ Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page
 from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 
) OR
 `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 
and `__offset` > 0);
 
-Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, 
`page`, `user` from kafka_table where `__offset` = 5;
+Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5;
 
-Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, 
`page`, `user` from kafka_table where `__offset` < 5;
+Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5;
 
-Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, 
`page`, `user` from kafka_table where `__offset` > 5;
+Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5;
 
 -- Timestamp filter
 
@@ -150,6 +150,15 @@ FROM kafka_table_2;
 
 Select count(*) FROM kafka_table_2;
 
+CREATE EXTERNAL TABLE wiki_kafka_avro_table_1
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "wiki_kafka_avro_table",
+"kafka.bootstrap.servers"="localhost:9092",
+"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe");
+
+SELECT * FROM wiki_kafka_avro_table_1;
+SELECT  COUNT (*) from wiki_kafka_avro_table_1;
 
 CREATE EXTERNAL TABLE wiki_kafka_avro_table
 STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
@@ -222,6 +231,7 @@ TBLPROPERTIES
 
 describe extended wiki_kafka_avro_table;
 
+
 select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, 
`__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, 
`deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table;
 
 select count(*) from wiki_kafka_avro_table;
@@ -231,6 +241,6 @@ select count(distinct `user`) from  wiki_kafka_avro_table;
 select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table;
 
 select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, 
`__timestamp` as kafka_record_ts_long,
-`__partition`, `__start_offset`,`__end_offset`,`__offset`, `timestamp`, 
`user`, `page`, `deleted`, `deltabucket`,
+`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, 
`timestamp`, `user`, `page`, `deleted`, `deltabucket`,
 `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` 
> 1534750625090;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out 
b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
index 3dec33d..593cd07 100644
--- a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
+++ b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
@@ -44,6 +44,7 @@ robot                 boolean                 from 
deserializer
 added                  int                     from deserializer   
 deleted                int                     from deserializer   
 delta                  bigint                  from deserializer   
+__key                  binary                  from deserializer   
 __partition            int                     from deserializer   
 __offset               bigint                  from deserializer   
 __timestamp            bigint                  from deserializer   
@@ -51,26 +52,28 @@ __start_offset              bigint                  from 
deserializer
 __end_offset           bigint                  from deserializer   
                 
 #### A masked pattern was here ####
-PREHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, 
`namespace`, `newPage` ,
+StorageHandlerInfo              
+Partition(topic = test-topic, partition = 0, leader = 1, replicas = [1], isr = 
[1], offlineReplicas = []) [start offset = [0], end offset = [10]]              
 
+PREHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, 
`__offset`,`__key`, `__time`, `page`, `user`, `language`, 
`country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM 
kafka_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, 
`namespace`, `newPage` ,
+POSTHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, 
`__offset`,`__key`, `__time`, `page`, `user`, `language`, 
`country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM 
kafka_table
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0      0       10      0       NULL    Gypsy Danger    nuclear en      United 
States   North America   article true    true    false   false   57      200    
 -143
-0      0       10      1       NULL    Striker Eureka  speed   en      
Australia       Australia       wikipedia       true    false   false   true    
459     129     330
-0      0       10      2       NULL    Cherno Alpha    masterYi        ru      
Russia  Asia    article true    false   false   true    123     12      111
-0      0       10      3       NULL    Crimson Typhoon triplets        zh      
China   Asia    wikipedia       false   true    false   true    905     5       
900
-0      0       10      4       NULL    Coyote Tango    stringer        ja      
Japan   Asia    wikipedia       false   true    false   true    1       10      
-9
-0      0       10      5       NULL    Gypsy Danger    nuclear en      United 
States   North America   article true    true    false   false   57      200    
 -143
-0      0       10      6       NULL    Striker Eureka  speed   en      
Australia       Australia       wikipedia       true    false   false   true    
459     129     330
-0      0       10      7       NULL    Cherno Alpha    masterYi        ru      
Russia  Asia    article true    false   false   true    123     12      111
-0      0       10      8       NULL    Crimson Typhoon triplets        zh      
China   Asia    wikipedia       false   true    false   true    905     5       
900
-0      0       10      9       NULL    Coyote Tango    stringer        ja      
Japan   Asia    wikipedia       false   true    false   true    1       10      
-9
+0      0       10      0       key     NULL    Gypsy Danger    nuclear en      
United States   North America   article true    true    false   false   57      
200     -143
+0      0       10      1       key     NULL    Striker Eureka  speed   en      
Australia       Australia       wikipedia       true    false   false   true    
459     129     330
+0      0       10      2       key     NULL    Cherno Alpha    masterYi        
ru      Russia  Asia    article true    false   false   true    123     12      
111
+0      0       10      3       key     NULL    Crimson Typhoon triplets        
zh      China   Asia    wikipedia       false   true    false   true    905     
5       900
+0      0       10      4       key     NULL    Coyote Tango    stringer        
ja      Japan   Asia    wikipedia       false   true    false   true    1       
10      -9
+0      0       10      5       key     NULL    Gypsy Danger    nuclear en      
United States   North America   article true    true    false   false   57      
200     -143
+0      0       10      6       key     NULL    Striker Eureka  speed   en      
Australia       Australia       wikipedia       true    false   false   true    
459     129     330
+0      0       10      7       key     NULL    Cherno Alpha    masterYi        
ru      Russia  Asia    article true    false   false   true    123     12      
111
+0      0       10      8       key     NULL    Crimson Typhoon triplets        
zh      China   Asia    wikipedia       false   true    false   true    905     
5       900
+0      0       10      9       key     NULL    Coyote Tango    stringer        
ja      Japan   Asia    wikipedia       false   true    false   true    1       
10      -9
 PREHOOK: query: Select count(*) FROM kafka_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
@@ -141,40 +144,40 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 0      1       9       1       NULL    Striker Eureka  speed   en      
Australia       Australia       wikipedia       true    false   false   true    
459     129     330
 0      1       9       4       NULL    Coyote Tango    stringer        ja      
Japan   Asia    wikipedia       false   true    false   true    1       10      
-9
 0      1       9       8       NULL    Crimson Typhoon triplets        zh      
China   Asia    wikipedia       false   true    false   true    905     5       
900
-PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
+PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
+POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0      5       6       5       NULL    Gypsy Danger    nuclear
-PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
+key    0       5       6       5       NULL    Gypsy Danger    nuclear
+PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
+POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0      0       5       0       NULL    Gypsy Danger    nuclear
-0      0       5       1       NULL    Striker Eureka  speed
-0      0       5       2       NULL    Cherno Alpha    masterYi
-0      0       5       3       NULL    Crimson Typhoon triplets
-0      0       5       4       NULL    Coyote Tango    stringer
-PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
+key    0       0       5       0       NULL    Gypsy Danger    nuclear
+key    0       0       5       1       NULL    Striker Eureka  speed
+key    0       0       5       2       NULL    Cherno Alpha    masterYi
+key    0       0       5       3       NULL    Crimson Typhoon triplets
+key    0       0       5       4       NULL    Coyote Tango    stringer
+PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
+POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, 
`__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0      6       10      6       NULL    Striker Eureka  speed
-0      6       10      7       NULL    Cherno Alpha    masterYi
-0      6       10      8       NULL    Crimson Typhoon triplets
-0      6       10      9       NULL    Coyote Tango    stringer
+key    0       6       10      6       NULL    Striker Eureka  speed
+key    0       6       10      7       NULL    Cherno Alpha    masterYi
+key    0       6       10      8       NULL    Crimson Typhoon triplets
+key    0       6       10      9       NULL    Coyote Tango    stringer
 PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, 
`__offset`, `user`  from kafka_table where
 `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' 
HOURS)
 PREHOOK: type: QUERY
@@ -606,6 +609,52 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table_2
 POSTHOOK: Output: hdfs://### HDFS PATH ###
 10
+PREHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table_1
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "wiki_kafka_avro_table",
+"kafka.bootstrap.servers"="localhost:9092",
+"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@wiki_kafka_avro_table_1
+POSTHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table_1
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "wiki_kafka_avro_table",
+"kafka.bootstrap.servers"="localhost:9092",
+"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@wiki_kafka_avro_table_1
+PREHOOK: query: SELECT * FROM wiki_kafka_avro_table_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@wiki_kafka_avro_table_1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM wiki_kafka_avro_table_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@wiki_kafka_avro_table_1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+key-0  0       0       1534736225090   0       11
+key-1  0       1       1534739825090   0       11
+key-2  0       2       1534743425090   0       11
+key-3  0       3       1534747025090   0       11
+key-4  0       4       1534750625090   0       11
+key-5  0       5       1534754225090   0       11
+key-6  0       6       1534757825090   0       11
+key-7  0       7       1534761425090   0       11
+key-8  0       8       1534765025090   0       11
+key-9  0       9       1534768625090   0       11
+key-10 0       10      1534772225090   0       11
+PREHOOK: query: SELECT  COUNT (*) from wiki_kafka_avro_table_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@wiki_kafka_avro_table_1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT  COUNT (*) from wiki_kafka_avro_table_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@wiki_kafka_avro_table_1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+11
 PREHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table
 STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
 TBLPROPERTIES
@@ -772,6 +821,7 @@ user                        string                  from 
deserializer
 deltabucket            double                  from deserializer   
 deleted                bigint                  from deserializer   
 namespace              string                  from deserializer   
+__key                  binary                  from deserializer   
 __partition            int                     from deserializer   
 __offset               bigint                  from deserializer   
 __timestamp            bigint                  from deserializer   
@@ -779,6 +829,8 @@ __start_offset              bigint                  from 
deserializer
 __end_offset           bigint                  from deserializer   
                 
 #### A masked pattern was here ####
+StorageHandlerInfo              
+Partition(topic = wiki_kafka_avro_table, partition = 0, leader = 1, replicas = 
[1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [11]]   
         
 PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as 
kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, 
`deleted`, `deltabucket`, `isanonymous`, `commentlength` from 
wiki_kafka_avro_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@wiki_kafka_avro_table
@@ -826,20 +878,20 @@ POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
 5522.000000000001      0
 PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as 
kafka_record_ts, `__timestamp` as kafka_record_ts_long,
-`__partition`, `__start_offset`,`__end_offset`,`__offset`, `timestamp`, 
`user`, `page`, `deleted`, `deltabucket`,
+`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, 
`timestamp`, `user`, `page`, `deleted`, `deltabucket`,
 `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` 
> 1534750625090
 PREHOOK: type: QUERY
 PREHOOK: Input: default@wiki_kafka_avro_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
 POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as 
kafka_record_ts, `__timestamp` as kafka_record_ts_long,
-`__partition`, `__start_offset`,`__end_offset`,`__offset`, `timestamp`, 
`user`, `page`, `deleted`, `deltabucket`,
+`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, 
`timestamp`, `user`, `page`, `deleted`, `deltabucket`,
 `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` 
> 1534750625090
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-2018-08-20 08:37:05.09 1534754225090   0       5       11      5       
08/20/2018 01:37:05     test-user-5     page is 500     -5      502.0   true    
5
-2018-08-20 09:37:05.09 1534757825090   0       5       11      6       
08/20/2018 02:37:05     test-user-6     page is 600     -6      
602.4000000000001       false   6
-2018-08-20 10:37:05.09 1534761425090   0       5       11      7       
08/20/2018 03:37:05     test-user-7     page is 700     -7      
702.8000000000001       true    7
-2018-08-20 11:37:05.09 1534765025090   0       5       11      8       
08/20/2018 04:37:05     test-user-8     page is 800     -8      803.2   true    
8
-2018-08-20 12:37:05.09 1534768625090   0       5       11      9       
08/20/2018 05:37:05     test-user-9     page is 900     -9      903.6   false   
9
-2018-08-20 13:37:05.09 1534772225090   0       5       11      10      
08/20/2018 06:37:05     test-user-10    page is 1000    -10     1004.0  true    
10
+2018-08-20 08:37:05.09 1534754225090   0       5       11      key-5   5       
08/20/2018 01:37:05     test-user-5     page is 500     -5      502.0   true    
5
+2018-08-20 09:37:05.09 1534757825090   0       5       11      key-6   6       
08/20/2018 02:37:05     test-user-6     page is 600     -6      
602.4000000000001       false   6
+2018-08-20 10:37:05.09 1534761425090   0       5       11      key-7   7       
08/20/2018 03:37:05     test-user-7     page is 700     -7      
702.8000000000001       true    7
+2018-08-20 11:37:05.09 1534765025090   0       5       11      key-8   8       
08/20/2018 04:37:05     test-user-8     page is 800     -8      803.2   true    
8
+2018-08-20 12:37:05.09 1534768625090   0       5       11      key-9   9       
08/20/2018 05:37:05     test-user-9     page is 900     -9      903.6   false   
9
+2018-08-20 13:37:05.09 1534772225090   0       5       11      key-10  10      
08/20/2018 06:37:05     test-user-10    page is 1000    -10     1004.0  true    
10

http://git-wip-us.apache.org/repos/asf/hive/blob/c7d5606b/testutils/ptest2/conf/deployed/master-mr2.properties
----------------------------------------------------------------------
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties 
b/testutils/ptest2/conf/deployed/master-mr2.properties
index 90a654c..333c3de 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -182,7 +182,7 @@ qFileTest.erasurecodingCli.groups.normal = 
mainProperties.${erasurecoding.only.q
 
 qFileTest.miniDruid.driver = TestMiniDruidCliDriver
 qFileTest.miniDruid.directory = ql/src/test/queries/clientpositive
-qFileTest.miniDruid.batchSize = 5
+qFileTest.miniDruid.batchSize = 50
 qFileTest.miniDruid.queryFilesProperty = qfile
 qFileTest.miniDruid.include = normal
 qFileTest.miniDruid.groups.normal = mainProperties.${druid.query.files}

Reply via email to