Repository: hive Updated Branches: refs/heads/master d22fc5b24 -> dca389b06
http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index cb8fa39..111f047 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,13 @@ package org.apache.hadoop.hive.ql.io; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.MapInputRowParser; @@ -69,141 +68,144 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; -public class TestDruidRecordWriter { - private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER; +/** + * Test Class for Druid Record Writer. + */ +@SuppressWarnings("ConstantConditions") public class TestDruidRecordWriter { + private final ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER; private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D"); - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - private DruidRecordWriter druidRecordWriter; - - final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of( - ImmutableMap.<String, Object>of( - DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, - DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(), - "host", ImmutableList.of("a.example.com"), - "visited_sum", 190L, - "unique_hosts", 1.0d - ), - ImmutableMap.<String, Object>of( - DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, - DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(), - "host", ImmutableList.of("b.example.com"), - "visited_sum", 175L, - "unique_hosts", 1.0d - ), - ImmutableMap.<String, Object>of( - DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, - DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(), - "host", ImmutableList.of("c.example.com"), - "visited_sum", 270L, - "unique_hosts", 1.0d - ) - ); - - - @Test - public void testTimeStampColumnName() { + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + final List<ImmutableMap<String, Object>> + expectedRows = + ImmutableList.of(ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(), + "host", + ImmutableList.of("a.example.com"), + "visited_sum", + 190L, + "unique_hosts", + 1.0d), + ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(), + "host", + ImmutableList.of("b.example.com"), + "visited_sum", + 175L, + "unique_hosts", + 1.0d), + ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(), + "host", + ImmutableList.of("c.example.com"), + "visited_sum", + 270L, + "unique_hosts", + 1.0d)); + + @Test public void testTimeStampColumnName() { Assert.assertEquals("Time column name need to match to ensure serdeser compatibility", - DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, DruidTable.DEFAULT_TIMESTAMP_COLUMN - ); + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + DruidTable.DEFAULT_TIMESTAMP_COLUMN); } + //Test is failing due to Guava dependency, Druid 0.13.0 should have less dependency on Guava - @Ignore - @Test - public void testWrite() throws IOException, SegmentLoadingException { + @Ignore @Test public void testWrite() throws IOException, SegmentLoadingException { final String dataSourceName = "testDataSource"; final File segmentOutputDir = temporaryFolder.newFolder(); final File workingDir = temporaryFolder.newFolder(); Configuration config = new Configuration(); - final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec( - new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), - new DimensionsSpec(ImmutableList.<DimensionSchema>of(new StringDimensionSchema("host")), - null, null - ) - )); - final Map<String, Object> parserMap = objectMapper.convertValue(inputRowParser, Map.class); - - DataSchema dataSchema = new DataSchema( - dataSourceName, + final InputRowParser + inputRowParser = + new MapInputRowParser(new TimeAndDimsParseSpec(new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + "auto", + null), new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("host")), null, null))); + final Map<String, Object> + parserMap = + objectMapper.convertValue(inputRowParser, new TypeReference<Map<String, Object>>() { + }); + + DataSchema + dataSchema = + new DataSchema(dataSourceName, parserMap, - new AggregatorFactory[] { - new LongSumAggregatorFactory("visited_sum", "visited_sum"), - new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts") - }, - new UniformGranularitySpec( - Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL) - ), - null, - objectMapper - ); + new AggregatorFactory[] { new LongSumAggregatorFactory("visited_sum", "visited_sum"), + new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts") }, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL)), + null, + objectMapper); IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); - RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null, - temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null, - 0L, null - ); + RealtimeTuningConfig + tuningConfig = + new RealtimeTuningConfig(null, + null, + null, + temporaryFolder.newFolder(), + null, + null, + null, + null, + indexSpec, + null, + 0, + 0, + null, + null, + 0L, + null); LocalFileSystem localFileSystem = FileSystem.getLocal(config); - DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher( - new LocalDataSegmentPusherConfig() { - @Override - public File getStorageDirectory() {return segmentOutputDir;} - }, objectMapper); - - Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(), - DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME - ); - druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, - segmentDescriptroPath, localFileSystem - ); - - List<DruidWritable> druidWritables = Lists.transform(expectedRows, - new Function<ImmutableMap<String, Object>, DruidWritable>() { - @Nullable - @Override - public DruidWritable apply(@Nullable ImmutableMap<String, Object> input - ) { - return new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input) - .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, - Granularities.DAY.bucketStart( - new DateTime((long) input - .get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN))) - .getMillis() - ).build()); - } - } - ); + DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig() { + @Override public File getStorageDirectory() { + return segmentOutputDir; + } + }, objectMapper); + + Path + segmentDescriptroPath = + new Path(workingDir.getAbsolutePath(), DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME); + DruidRecordWriter + druidRecordWriter = + new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, segmentDescriptroPath, localFileSystem); + + List<DruidWritable> + druidWritables = + expectedRows.stream() + .map(input -> new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input) + .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, + Granularities.DAY.bucketStart(new DateTime((long) input.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN))) + .getMillis()) + .build())) + .collect(Collectors.toList()); for (DruidWritable druidWritable : druidWritables) { druidRecordWriter.write(druidWritable); } druidRecordWriter.close(false); - List<DataSegment> dataSegmentList = DruidStorageHandlerUtils - .getCreatedSegments(segmentDescriptroPath, config); + List<DataSegment> dataSegmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptroPath, config); Assert.assertEquals(1, dataSegmentList.size()); File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir); - final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO - .loadIndex(tmpUnzippedSegmentDir); + final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO.loadIndex(tmpUnzippedSegmentDir); QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex); - Firehose firehose = new IngestSegmentFirehose( - ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())), + Firehose + firehose = + new IngestSegmentFirehose(ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())), null, ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), - null - ); + null); List<InputRow> rows = Lists.newArrayList(); while (firehose.hasMore()) { @@ -214,9 +216,7 @@ public class TestDruidRecordWriter { } - private void verifyRows(List<ImmutableMap<String, Object>> expectedRows, - List<InputRow> actualRows - ) { + private void verifyRows(List<ImmutableMap<String, Object>> expectedRows, List<InputRow> actualRows) { System.out.println("actualRows = " + actualRows); Assert.assertEquals(expectedRows.size(), actualRows.size()); @@ -227,25 +227,21 @@ public class TestDruidRecordWriter { Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); Assert.assertEquals(expected.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN), - actual.getTimestamp().getMillis() - ); + actual.getTimestamp().getMillis()); Assert.assertEquals(expected.get("host"), actual.getDimension("host")); Assert.assertEquals(expected.get("visited_sum"), actual.getMetric("visited_sum")); - Assert.assertEquals( - (Double) expected.get("unique_hosts"), - (Double) HyperUniquesAggregatorFactory - .estimateCardinality(actual.getRaw("unique_hosts"), false), - 0.001 - ); + Assert.assertEquals((Double) expected.get("unique_hosts"), + (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"), false), + 0.001); } } - @Test - public void testSerDesr() throws IOException { - String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}"; - DataSegment dataSegment = objectMapper.reader(DataSegment.class) - .readValue(segment); - Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015")); + @Test public void testSerDesr() throws IOException { + String + segment = + "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}"; + DataSegment dataSegment = objectMapper.readerFor(DataSegment.class).readValue(segment); + Assert.assertEquals("datasource2015", dataSegment.getDataSource()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/ql/src/test/queries/clientpositive/kafka_storage_handler.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/kafka_storage_handler.q b/ql/src/test/queries/clientpositive/kafka_storage_handler.q index 782b4e0..bab13cb 100644 --- a/ql/src/test/queries/clientpositive/kafka_storage_handler.q +++ b/ql/src/test/queries/clientpositive/kafka_storage_handler.q @@ -39,8 +39,8 @@ Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_tab -- Timestamp filter -Select `__partition`, `__offset`, `user` from kafka_table where -`__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) ; +Select `__partition`, `__offset`, `user` from kafka_table where +`__timestamp` > to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS) ; -- non existing partition Select count(*) from kafka_table where `__partition` = 1; http://git-wip-us.apache.org/repos/asf/hive/blob/dca389b0/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out index 98fae6c..883b447 100644 --- a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out +++ b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out @@ -176,13 +176,13 @@ key 0 6 NULL Striker Eureka speed key 0 7 NULL Cherno Alpha masterYi key 0 8 NULL Crimson Typhoon triplets key 0 9 NULL Coyote Tango stringer -PREHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where -`__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) +PREHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where +`__timestamp` > to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS) PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where -`__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) +POSTHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where +`__timestamp` > to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS) POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ###