Repository: hive Updated Branches: refs/heads/branch-3 3c90b9910 -> c6565b2c2
HIVE-19187 : Update Druid Storage Handler to Druid 0.12.0 (Slim Bouguerra via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6565b2c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6565b2c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6565b2c Branch: refs/heads/branch-3 Commit: c6565b2c2d05d79b3c2e99a3196f88ae143bef81 Parents: 3c90b99 Author: Slim Bouguerra <slim.bougue...@gmail.com> Authored: Wed Apr 11 17:56:00 2018 -0700 Committer: Vineet Garg <vg...@apache.org> Committed: Fri Apr 13 10:00:17 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/druid/DruidStorageHandler.java | 4 ++- .../hive/druid/DruidStorageHandlerUtils.java | 30 +++++++++++++++++--- .../hadoop/hive/druid/io/DruidOutputFormat.java | 4 ++- .../druid/io/DruidQueryBasedInputFormat.java | 20 +------------ .../hive/druid/json/KafkaTuningConfig.java | 8 ++++-- .../hive/ql/io/TestDruidRecordWriter.java | 6 ++-- pom.xml | 2 +- 7 files changed, 44 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c6565b2c/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 76540b7..c0feb8d 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -49,6 +49,7 @@ import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.derby.DerbyConnector; import io.druid.metadata.storage.derby.DerbyMetadataStorage; import io.druid.metadata.storage.mysql.MySQLConnector; +import io.druid.metadata.storage.mysql.MySQLConnectorConfig; import io.druid.metadata.storage.postgresql.PostgreSQLConnector; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexSpec; @@ -335,6 +336,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor inputParser, dimensionsAndAggregates.rhs, granularitySpec, + null, DruidStorageHandlerUtils.JSON_MAPPER ); @@ -880,7 +882,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor if (dbType.equals("mysql")) { connector = new MySQLConnector(storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) - ); + , new MySQLConnectorConfig()); } else if (dbType.equals("postgresql")) { connector = new PostgreSQLConnector(storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) http://git-wip-us.apache.org/repos/asf/hive/blob/c6565b2c/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 1424237..1aef565 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -26,6 +26,7 @@ import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; +import io.druid.query.Druids; import io.druid.query.expression.LikeExprMacro; import io.druid.query.expression.RegexpExtractExprMacro; import io.druid.query.expression.TimestampCeilExprMacro; @@ -35,7 +36,9 @@ import io.druid.query.expression.TimestampFormatExprMacro; import io.druid.query.expression.TimestampParseExprMacro; import io.druid.query.expression.TimestampShiftExprMacro; import io.druid.query.expression.TrimExprMacro; +import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQueryConfig; +import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.query.aggregation.AggregatorFactory; @@ -48,6 +51,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; @@ -124,8 +128,10 @@ import java.net.URL; import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -168,6 +174,7 @@ public final class DruidStorageHandlerUtils { * Mapper to use to serialize/deserialize Druid objects (SMILE) */ public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory()); + private static final int DEFAULT_MAX_TRIES = 10; static { // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig @@ -187,7 +194,8 @@ public final class DruidStorageHandlerUtils { new TrimExprMacro.LeftTrimExprMacro(), new TrimExprMacro.RightTrimExprMacro() ))) - .addValue(ObjectMapper.class, JSON_MAPPER); + .addValue(ObjectMapper.class, JSON_MAPPER) + .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); @@ -214,13 +222,14 @@ public final class DruidStorageHandlerUtils { /** * Used by druid to perform IO on indexes */ - public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, () -> 0); + public static final IndexIO INDEX_IO = + new IndexIO(JSON_MAPPER, TmpFileSegmentWriteOutMediumFactory.instance(), () -> 0); /** * Used by druid to merge indexes */ public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, - DruidStorageHandlerUtils.INDEX_IO + DruidStorageHandlerUtils.INDEX_IO,TmpFileSegmentWriteOutMediumFactory.instance() ); /** @@ -606,7 +615,7 @@ public final class DruidStorageHandlerUtils { } } ) - , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES); + , 3, DEFAULT_MAX_TRIES); return segmentList; } @@ -637,6 +646,19 @@ public final class DruidStorageHandlerUtils { ); } + public static String createSelectStarQuery(String dataSource) throws IOException { + // Create Select query + Druids.SelectQueryBuilder builder = new Druids.SelectQueryBuilder(); + builder.dataSource(dataSource); + final List<Interval> intervals = Arrays.asList(DEFAULT_INTERVAL); + builder.intervals(new MultipleIntervalSegmentSpec(intervals)); + builder.pagingSpec(PagingSpec.newSpec(1)); + Map<String, Object> context = new HashMap<>(); + context.put(Constants.DRUID_QUERY_FETCH, false); + builder.context(context); + return JSON_MAPPER.writeValueAsString(builder.build()); + } + /** * Simple interface for retry operations */ http://git-wip-us.apache.org/repos/asf/hive/blob/c6565b2c/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 15a08eb..ecb4360 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -129,6 +129,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl inputParser, dimensionsAndAggregates.rhs, granularitySpec, + null, DruidStorageHandlerUtils.JSON_MAPPER ); @@ -156,7 +157,8 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl 0, true, null, - 0L + 0L, + null ); LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); http://git-wip-us.apache.org/repos/asf/hive/blob/c6565b2c/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index c097a13..c2d3fe5 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -22,9 +22,7 @@ import java.io.InputStream; import java.net.URL; import java.net.URLEncoder; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; @@ -53,7 +51,6 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,8 +61,6 @@ import com.google.common.collect.Lists; import com.metamx.http.client.Request; import io.druid.query.BaseQuery; -import io.druid.query.Druids; -import io.druid.query.Druids.SelectQueryBuilder; import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.Query; import io.druid.query.SegmentDescriptor; @@ -133,7 +128,7 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW throw new IOException("Druid data source cannot be empty or null"); } //@FIXME https://issues.apache.org/jira/browse/HIVE-19023 use scan instead of Select - druidQuery = createSelectStarQuery(dataSource); + druidQuery = DruidStorageHandlerUtils.createSelectStarQuery(dataSource); druidQueryType = Query.SELECT; } else { druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE); @@ -169,19 +164,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW } } - private static String createSelectStarQuery(String dataSource) throws IOException { - // Create Select query - SelectQueryBuilder builder = new Druids.SelectQueryBuilder(); - builder.dataSource(dataSource); - final List<Interval> intervals = Arrays.asList(DruidStorageHandlerUtils.DEFAULT_INTERVAL); - builder.intervals(intervals); - builder.pagingSpec(PagingSpec.newSpec(1)); - Map<String, Object> context = new HashMap<>(); - context.put(Constants.DRUID_QUERY_FETCH, false); - builder.context(context); - return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build()); - } - /* New method that distributes the Select query by creating splits containing * information about different Druid nodes that have the data for the given * query. */ http://git-wip-us.apache.org/repos/asf/hive/blob/c6565b2c/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java index ea23ddd..1ec8b5c 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java @@ -24,11 +24,11 @@ import io.druid.segment.realtime.appenderator.AppenderatorConfig; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; +import javax.annotation.Nullable; import java.io.File; /** @@ -131,6 +131,10 @@ public class KafkaTuningConfig implements AppenderatorConfig return basePersistDirectory; } + @Nullable @Override public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() { + return null; + } + @Override @JsonProperty public int getMaxPendingPersists() http://git-wip-us.apache.org/repos/asf/hive/blob/c6565b2c/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index c1bd332..cb8fa39 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -144,13 +144,14 @@ public class TestDruidRecordWriter { new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL) ), + null, objectMapper ); IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null, temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null, - 0L + 0L, null ); LocalFileSystem localFileSystem = FileSystem.getLocal(config); DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher( @@ -198,6 +199,7 @@ public class TestDruidRecordWriter { Firehose firehose = new IngestSegmentFirehose( ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())), + null, ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), null @@ -228,7 +230,7 @@ public class TestDruidRecordWriter { actual.getTimestamp().getMillis() ); Assert.assertEquals(expected.get("host"), actual.getDimension("host")); - Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals(expected.get("visited_sum"), actual.getMetric("visited_sum")); Assert.assertEquals( (Double) expected.get("unique_hosts"), (Double) HyperUniquesAggregatorFactory http://git-wip-us.apache.org/repos/asf/hive/blob/c6565b2c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8fc9a75..76b11ef 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ <derby.version>10.14.1.0</derby.version> <dropwizard.version>3.1.0</dropwizard.version> <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version> - <druid.version>0.11.0</druid.version> + <druid.version>0.12.0</druid.version> <guava.version>19.0</guava.version> <groovy.version>2.4.11</groovy.version> <h2database.version>1.3.166</h2database.version>