Repository: hive
Updated Branches:
  refs/heads/master d22fc5b24 -> dca389b06


http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
 
b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index cb8fa39..111f047 100644
--- 
a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ 
b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import io.druid.data.input.Firehose;
 import io.druid.data.input.InputRow;
-import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.InputRowParser;
 import io.druid.data.input.impl.MapInputRowParser;
@@ -69,141 +68,144 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-public class TestDruidRecordWriter {
-  private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER;
+/**
+ * Test Class for Druid Record Writer.
+ */
+@SuppressWarnings("ConstantConditions") public class TestDruidRecordWriter {
+  private final ObjectMapper objectMapper = 
DruidStorageHandlerUtils.JSON_MAPPER;
 
   private static final Interval INTERVAL_FULL = new 
Interval("2014-10-22T00:00:00Z/P1D");
 
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-  private DruidRecordWriter druidRecordWriter;
-
-  final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
-          ImmutableMap.<String, Object>of(
-                  DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
-                  DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
-                  "host", ImmutableList.of("a.example.com"),
-                  "visited_sum", 190L,
-                  "unique_hosts", 1.0d
-          ),
-          ImmutableMap.<String, Object>of(
-                  DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
-                  DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
-                  "host", ImmutableList.of("b.example.com"),
-                  "visited_sum", 175L,
-                  "unique_hosts", 1.0d
-          ),
-          ImmutableMap.<String, Object>of(
-                  DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
-                  DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
-                  "host", ImmutableList.of("c.example.com"),
-                  "visited_sum", 270L,
-                  "unique_hosts", 1.0d
-          )
-  );
-
-
-  @Test
-  public void testTimeStampColumnName() {
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  final List<ImmutableMap<String, Object>>
+      expectedRows =
+      
ImmutableList.of(ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+          DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
+          "host",
+          ImmutableList.of("a.example.com"),
+          "visited_sum",
+          190L,
+          "unique_hosts",
+          1.0d),
+          ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+              DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
+              "host",
+              ImmutableList.of("b.example.com"),
+              "visited_sum",
+              175L,
+              "unique_hosts",
+              1.0d),
+          ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+              DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
+              "host",
+              ImmutableList.of("c.example.com"),
+              "visited_sum",
+              270L,
+              "unique_hosts",
+              1.0d));
+
+  @Test public void testTimeStampColumnName() {
     Assert.assertEquals("Time column name need to match to ensure serdeser 
compatibility",
-            DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, 
DruidTable.DEFAULT_TIMESTAMP_COLUMN
-    );
+        DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+        DruidTable.DEFAULT_TIMESTAMP_COLUMN);
   }
+
   //Test is failing due to Guava dependency, Druid 0.13.0 should have less 
dependency on Guava
-  @Ignore
-  @Test
-  public void testWrite() throws IOException, SegmentLoadingException {
+  @Ignore @Test public void testWrite() throws IOException, 
SegmentLoadingException {
 
     final String dataSourceName = "testDataSource";
     final File segmentOutputDir = temporaryFolder.newFolder();
     final File workingDir = temporaryFolder.newFolder();
     Configuration config = new Configuration();
 
-    final InputRowParser inputRowParser = new MapInputRowParser(new 
TimeAndDimsParseSpec(
-            new 
TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
-            new DimensionsSpec(ImmutableList.<DimensionSchema>of(new 
StringDimensionSchema("host")),
-                    null, null
-            )
-    ));
-    final Map<String, Object> parserMap = 
objectMapper.convertValue(inputRowParser, Map.class);
-
-    DataSchema dataSchema = new DataSchema(
-            dataSourceName,
+    final InputRowParser
+        inputRowParser =
+        new MapInputRowParser(new TimeAndDimsParseSpec(new 
TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN,
+            "auto",
+            null), new DimensionsSpec(ImmutableList.of(new 
StringDimensionSchema("host")), null, null)));
+    final Map<String, Object>
+        parserMap =
+        objectMapper.convertValue(inputRowParser, new 
TypeReference<Map<String, Object>>() {
+        });
+
+    DataSchema
+        dataSchema =
+        new DataSchema(dataSourceName,
             parserMap,
-            new AggregatorFactory[] {
-                    new LongSumAggregatorFactory("visited_sum", "visited_sum"),
-                    new HyperUniquesAggregatorFactory("unique_hosts", 
"unique_hosts")
-            },
-            new UniformGranularitySpec(
-                    Granularities.DAY, Granularities.NONE, 
ImmutableList.of(INTERVAL_FULL)
-            ),
-        null,
-            objectMapper
-    );
+            new AggregatorFactory[] { new 
LongSumAggregatorFactory("visited_sum", "visited_sum"),
+                new HyperUniquesAggregatorFactory("unique_hosts", 
"unique_hosts") },
+            new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, 
ImmutableList.of(INTERVAL_FULL)),
+            null,
+            objectMapper);
 
     IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), 
null, null, null);
-    RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, 
null,
-            temporaryFolder.newFolder(), null, null, null, null, indexSpec, 
null, 0, 0, null, null,
-            0L, null
-    );
+    RealtimeTuningConfig
+        tuningConfig =
+        new RealtimeTuningConfig(null,
+            null,
+            null,
+            temporaryFolder.newFolder(),
+            null,
+            null,
+            null,
+            null,
+            indexSpec,
+            null,
+            0,
+            0,
+            null,
+            null,
+            0L,
+            null);
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
-    DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
-            new LocalDataSegmentPusherConfig() {
-              @Override
-              public File getStorageDirectory() {return segmentOutputDir;}
-            }, objectMapper);
-
-    Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(),
-            DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME
-    );
-    druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, 
dataSegmentPusher, 20,
-            segmentDescriptroPath, localFileSystem
-    );
-
-    List<DruidWritable> druidWritables = Lists.transform(expectedRows,
-            new Function<ImmutableMap<String, Object>, DruidWritable>() {
-              @Nullable
-              @Override
-              public DruidWritable apply(@Nullable ImmutableMap<String, 
Object> input
-              ) {
-                return new DruidWritable(ImmutableMap.<String, 
Object>builder().putAll(input)
-                        .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
-                                Granularities.DAY.bucketStart(
-                                        new DateTime((long) input
-                                                
.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)))
-                                        .getMillis()
-                        ).build());
-              }
-            }
-    );
+    DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(new 
LocalDataSegmentPusherConfig() {
+      @Override public File getStorageDirectory() {
+        return segmentOutputDir;
+      }
+    }, objectMapper);
+
+    Path
+        segmentDescriptroPath =
+        new Path(workingDir.getAbsolutePath(), 
DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME);
+    DruidRecordWriter
+        druidRecordWriter =
+        new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, 
segmentDescriptroPath, localFileSystem);
+
+    List<DruidWritable>
+        druidWritables =
+        expectedRows.stream()
+            .map(input -> new DruidWritable(ImmutableMap.<String, 
Object>builder().putAll(input)
+                .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+                    Granularities.DAY.bucketStart(new DateTime((long) 
input.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)))
+                        .getMillis())
+                .build()))
+            .collect(Collectors.toList());
     for (DruidWritable druidWritable : druidWritables) {
       druidRecordWriter.write(druidWritable);
     }
     druidRecordWriter.close(false);
-    List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
-            .getCreatedSegments(segmentDescriptroPath, config);
+    List<DataSegment> dataSegmentList = 
DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptroPath, config);
     Assert.assertEquals(1, dataSegmentList.size());
     File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
     new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), 
tmpUnzippedSegmentDir);
-    final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO
-            .loadIndex(tmpUnzippedSegmentDir);
+    final QueryableIndex queryableIndex = 
DruidStorageHandlerUtils.INDEX_IO.loadIndex(tmpUnzippedSegmentDir);
 
     QueryableIndexStorageAdapter adapter = new 
QueryableIndexStorageAdapter(queryableIndex);
 
-    Firehose firehose = new IngestSegmentFirehose(
-            ImmutableList.of(new WindowedStorageAdapter(adapter, 
adapter.getInterval())),
+    Firehose
+        firehose =
+        new IngestSegmentFirehose(ImmutableList.of(new 
WindowedStorageAdapter(adapter, adapter.getInterval())),
             null,
             ImmutableList.of("host"),
             ImmutableList.of("visited_sum", "unique_hosts"),
-            null
-    );
+            null);
 
     List<InputRow> rows = Lists.newArrayList();
     while (firehose.hasMore()) {
@@ -214,9 +216,7 @@ public class TestDruidRecordWriter {
 
   }
 
-  private void verifyRows(List<ImmutableMap<String, Object>> expectedRows,
-          List<InputRow> actualRows
-  ) {
+  private void verifyRows(List<ImmutableMap<String, Object>> expectedRows, 
List<InputRow> actualRows) {
     System.out.println("actualRows = " + actualRows);
     Assert.assertEquals(expectedRows.size(), actualRows.size());
 
@@ -227,25 +227,21 @@ public class TestDruidRecordWriter {
       Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
 
       
Assert.assertEquals(expected.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN),
-              actual.getTimestamp().getMillis()
-      );
+          actual.getTimestamp().getMillis());
       Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
       Assert.assertEquals(expected.get("visited_sum"), 
actual.getMetric("visited_sum"));
-      Assert.assertEquals(
-              (Double) expected.get("unique_hosts"),
-              (Double) HyperUniquesAggregatorFactory
-                      .estimateCardinality(actual.getRaw("unique_hosts"), 
false),
-              0.001
-      );
+      Assert.assertEquals((Double) expected.get("unique_hosts"),
+          (Double) 
HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"),
 false),
+          0.001);
     }
   }
 
-  @Test
-  public void testSerDesr() throws IOException {
-    String segment = 
"{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
-    DataSegment dataSegment = objectMapper.reader(DataSegment.class)
-            .readValue(segment);
-    Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015"));
+  @Test public void testSerDesr() throws IOException {
+    String
+        segment =
+        
"{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
+    DataSegment dataSegment = 
objectMapper.readerFor(DataSegment.class).readValue(segment);
+    Assert.assertEquals("datasource2015", dataSegment.getDataSource());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/ql/src/test/queries/clientpositive/kafka_storage_handler.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/kafka_storage_handler.q 
b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
index 782b4e0..bab13cb 100644
--- a/ql/src/test/queries/clientpositive/kafka_storage_handler.q
+++ b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
@@ -39,8 +39,8 @@ Select `__key`,`__partition`, `__offset`,`__time`, `page`, 
`user` from kafka_tab
 
 -- Timestamp filter
 
-Select `__partition`, `__offset`, `user`  from kafka_table where
-`__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' 
HOURS) ;
+Select `__partition`, `__offset`, `user` from kafka_table where
+`__timestamp` >  to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS) ;
 
 -- non existing partition
 Select  count(*) from kafka_table where `__partition` = 1;

http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out 
b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
index 98fae6c..883b447 100644
--- a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
+++ b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
@@ -176,13 +176,13 @@ key       0       6       NULL    Striker Eureka  speed
 key    0       7       NULL    Cherno Alpha    masterYi
 key    0       8       NULL    Crimson Typhoon triplets
 key    0       9       NULL    Coyote Tango    stringer
-PREHOOK: query: Select `__partition`, `__offset`, `user`  from kafka_table 
where
-`__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' 
HOURS)
+PREHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where
+`__timestamp` >  to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`, `__offset`, `user`  from kafka_table 
where
-`__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' 
HOURS)
+POSTHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table 
where
+`__timestamp` >  to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###

Reply via email to