JulianJaffePinterest commented on a change in pull request #11823: URL: https://github.com/apache/druid/pull/11823#discussion_r749914212
########## File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.spark.v2.reader + +import com.fasterxml.jackson.core.`type`.TypeReference +import org.apache.druid.java.util.common.{Intervals, JodaUtils} +import org.apache.druid.spark.MAPPER +import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient} +import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys} +import org.apache.druid.spark.mixins.Logging +import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils} +import org.apache.druid.timeline.DataSegment +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, + SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.joda.time.Interval + +import java.util.{List => JList} +import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter} + +/** + * A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine + * where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining + * the Druid cluster. In general, users should not directly instantiate instances of this class but instead use + * sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead + * can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...) + * + * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied. + */ +class DruidDataSourceReader( + var schema: Option[StructType] = None, + conf: Configuration + ) extends DataSourceReader + with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging { + private lazy val metadataClient = + DruidDataSourceReader.createDruidMetaDataClient(conf) + private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf) + + private var filters: Array[Filter] = Array.empty + private var druidColumnTypes: Option[Set[String]] = Option.empty + + override def readSchema(): StructType = { + if (schema.isDefined) { + schema.get + } else { + require(conf.isPresent(DruidConfigurationKeys.tableKey), + s"Must set ${DruidConfigurationKeys.tableKey}!") + // TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than + // twice the granularity duration, we can send a list with two disjoint intervals and + // minimize the load on the broker from having to merge large numbers of segments + val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters) + val columnMap = druidClient.getSchema( + conf.getString(DruidConfigurationKeys.tableKey), + Some(List[Interval](Intervals.utc( + lowerBound.getOrElse(JodaUtils.MIN_INSTANT), + upperBound.getOrElse(JodaUtils.MAX_INSTANT) + ))) + ) + schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap)) + druidColumnTypes = Option(columnMap.map(_._2._1).toSet) + schema.get + } + } + + override def planInputPartitions(): JList[InputPartition[InternalRow]] = { + // For now, one partition for each Druid segment partition + // Future improvements can use information from SegmentAnalyzer results to do smart things + if (schema.isEmpty) { + readSchema() + } + val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix) + val filter = FilterUtils.mapFilters(filters, schema.get) + val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey) + val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey) + val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey) + + // Allow passing hard-coded list of segments to load + if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) { + val segments: JList[DataSegment] = MAPPER.readValue( + readerConf.getString(DruidConfigurationKeys.segmentsKey), + new TypeReference[JList[DataSegment]]() {} + ) + segments.asScala + .map(segment => + new DruidInputPartition( + segment, + schema.get, + filter, + druidColumnTypes, + conf, + useSparkConfForDeepStorage, + useCompactSketches, + useDefaultNullHandling + ): InputPartition[InternalRow] + ).asJava + } else { + getSegments + .map(segment => + new DruidInputPartition( + segment, + schema.get, + filter, + druidColumnTypes, + conf, + useSparkConfForDeepStorage, + useCompactSketches, + useDefaultNullHandling + ): InputPartition[InternalRow] + ).asJava + } + } + + override def pruneColumns(structType: StructType): Unit = { + schema = Option(structType) + } + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + readSchema() + filters.partition(FilterUtils.isSupportedFilter(_, schema.get)) match { + case (supported, unsupported) => + this.filters = supported + unsupported + } + } + + override def pushedFilters(): Array[Filter] = filters + + private[v2] def getSegments: Seq[DataSegment] = { + require(conf.isPresent(DruidConfigurationKeys.tableKey), + s"Must set ${DruidConfigurationKeys.tableKey}!") + + // Check filters for any bounds on __time + // Otherwise, we'd need to full scan the segments table + val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters) + + metadataClient.getSegmentPayloads( + conf.getString(DruidConfigurationKeys.tableKey), + lowerTimeBound, + upperTimeBound, + conf.getBoolean(DruidConfigurationKeys.allowIncompletePartitionsDefaultKey) + ) + } + + override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = { + if (schema.isEmpty) { + readSchema() + } + val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix) + val filter = FilterUtils.mapFilters(filters, schema.get) + val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey) + val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey) + val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey) + val batchSize = readerConf.getInt(DruidConfigurationKeys.batchSizeDefaultKey) + + // Allow passing hard-coded list of segments to load + if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) { + val segments: JList[DataSegment] = MAPPER.readValue( + readerConf.getString(DruidConfigurationKeys.segmentsKey), + new TypeReference[JList[DataSegment]]() {} + ) + segments.asScala + .map(segment => + new DruidColumnarInputPartition( + segment, + schema.get, + filter, + druidColumnTypes, + conf, + useSparkConfForDeepStorage, + useCompactSketches, + useDefaultNullHandling, + batchSize + ): InputPartition[ColumnarBatch] + ).asJava + } else { + getSegments + .map(segment => + new DruidColumnarInputPartition( + segment, + schema.get, + filter, + druidColumnTypes, + conf, + useSparkConfForDeepStorage, + useCompactSketches, + useDefaultNullHandling, + batchSize + ): InputPartition[ColumnarBatch] + ).asJava + } + } + + override def enableBatchRead(): Boolean = { + // Fail fast + if (!conf.dive(DruidConfigurationKeys.readerPrefix).getBoolean(DruidConfigurationKeys.vectorizeDefaultKey)) { + false + } else { + if (schema.isEmpty) { + readSchema() + } + val filterOpt = FilterUtils.mapFilters(filters, schema.get) + filterOpt.fold(true) { filter => + val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get) + val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature) Review comment: We are looking at the schema to decide the value of `canVectorize` (we generate the Druid `RowSignature` and pass it to the `canVectorizeMatcher` call). Are you saying we should be more conservative and disable vectorization in additional cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
