JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r738208127



##########
File path: docs/operations/spark.md
##########
@@ -0,0 +1,279 @@
+---
+id: spark
+title: "Apache Spark Reader and Writer"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Spark Reader and Writer for Druid
+
+## Reader
+The reader reads Druid segments from deep storage into Spark. It locates the 
segments to read and determines their
+schema if not provided by querying the brokers for the relevant metadata but 
otherwise does not interact with a running
+Druid cluster.
+
+Sample Code:
+```scala
+import org.apache.druid.spark.DruidDataFrameReader
+
+val deepStorageConfig = new 
LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
+
+sparkSession
+  .read
+  .brokerHost("localhost")
+  .brokerPort(8082)
+  .metadataDbType("mysql")
+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
+  .metadataUser("druid")
+  .metadataPassword("diurd")
+  .dataSource("dataSource")
+  .deepStorage(deepStorageConfig)
+  .druid()
+```
+
+Alternatively, the reader can be configured via a properties map with no 
additional import needed:
+```scala
+val properties = Map[String, String](
+  "metadata.dbType" -> "mysql",
+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
+  "metadata.user" -> "druid",
+  "metadata.password" -> "diurd",
+  "broker.host" -> "localhost",
+  "broker.port" -> 8082,
+  "table" -> "dataSource",
+  "reader.deepStorageType" -> "local",
+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"
+)
+
+sparkSession
+  .read
+  .format("druid")
+  .options(properties)
+  .load()
+```
+
+If you know the schema of the Druid data source you're reading from, you can 
save needing to determine the schema via
+calls to the broker with
+```scala
+sparkSession
+  .read
+  .format("druid")
+  .schema(schema)
+  .options(properties)
+  .load()
+```
+
+Filters should be applied to the read-in data frame before any [Spark 
actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
+are triggered, to allow predicates to be pushed down to the reader and avoid 
full scans of the underlying Druid data.
+
+## Plugin Registries and Druid Extension Support
+One of Druid's strengths is its extensibility. Since these Spark readers and 
writers will not execute on a Druid cluster
+and won't have the ability to dynamically load classes or integrate with 
Druid's Guice injectors, Druid extensions can't
+be used directly. Instead, these connectors use a plugin registry 
architecture, including default plugins that support
+most functionality in `extensions-core`. Custom plugins consisting of a string 
name and one or more serializable
+generator functions must be registered before the first Spark action which 
would depend on them is called.
+
+### ComplexMetricRegistry
+The `ComplexMetricRegistry` provides support for serializing and deserializing 
complex metric types between Spark and
+Druid. Support for complex metric types in Druid core extensions is provided 
out of the box.
+
+Users wishing to override the default behavior or who need to add support for 
additional complex metric types can
+use the `ComplexMetricRegistry.register` functions to associate serde 
functions with a given complex metric type. The
+name used to register custom behavior must match the complex metric type name 
reported by Druid.
+**Note that custom plugins must be registered with both the executors and the 
Spark driver.**
+
+### SegmentReaderRegistry
+The `SegmentReaderRegistry` provides support for reading segments from deep 
storage. Local, HDFS, GCS, S3, and Azure
+Storage deep storage implementations are supported by default.
+
+Users wishing to override the default behavior or who need to add support for 
additional deep storage implementations
+can use either `SegmentReaderRegistry.registerInitializer` (to provide any 
necessary Jackson configuration for
+deserializing a `LoadSpec` object from a segment load spec) or 
`SegmentReaderRegistry.registerLoadFunction` (to register
+a function for creating a URI from a segment load spec). These two functions 
correspond to the first and second approach
+[outlined below](#deep-storage). **Note that custom plugins must be registered 
on the executors, not the Spark driver.**
+
+### SQLConnectorRegistry
+The `SQLConnectorRegistry` provides support for configuring connectors to 
Druid metadata databases. Support for MySQL,
+PostgreSQL, and Derby databases are provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for 
additional metadata database
+implementations can use the `SQLConnectorRegistry.register` function. Custom 
connectors should be registered on the
+driver.
+
+## Deploying to a Spark cluster
+This extension can be run on a Spark cluster in one of two ways: bundled as 
part of an application jar or uploaded as
+a library jar to a Spark cluster and included in the classpath provided to 
Spark applications by the application
+manager. If the second approach is used, this extension should be built with
+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`
+uploaded to the Spark cluster. Application jars should then be built with a 
compile-time dependency on
+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with 
`compileOnly` in Gradle).
+
+## Configuration Reference
+
+### Metadata Client Configs
+The properties used to configure the client that interacts with the Druid 
metadata server directly. Used by both reader
+and the writer. The `metadataPassword` property can either be provided as a 
string that will be used as-is or can be
+provided as a serialized DynamicConfigProvider that will be resolved when the 
metadata client is first instantiated. If
+a  custom DynamicConfigProvider is used, be sure to register the provider with 
the DynamicConfigProviderRegistry before use.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`metadata.dbType`|The metadata server's database type (e.g. `mysql`)|Yes||
+|`metadata.host`|The metadata server's host name|If using derby|`localhost`|
+|`metadata.port`|The metadata server's port|If using derby|1527|
+|`metadata.connectUri`|The URI to use to connect to the metadata server|If not 
using derby||
+|`metadata.user`|The user to use when connecting to the metadata server|If 
required by the metadata database||
+|`metadata.password`|The password to use when connecting to the metadata 
server. This can optionally be a serialized instance of a Druid 
DynamicConfigProvider or a plain string|If required by the metadata database||
+|`metadata.dbcpProperties`|The connection pooling properties to use when 
connecting to the metadata server|No||
+|`metadata.baseName`|The base name used when creating Druid metadata 
tables|No|`druid`|
+
+### Druid Client Configs
+The configuration properties used to query the Druid cluster for segment 
metadata. Only used in the reader.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`broker.host`|The hostname of a broker in the Druid cluster to read 
from|No|`localhost`|
+|`broker.port`|The port of the broker in the Druid cluster to read 
from|No|8082|
+|`broker.numRetries`|The number of times to retry a timed-out segment metadata 
request|No|5|
+|`broker.retryWaitSeconds`|How long (in seconds) to wait before retrying a 
timed-out segment metadata request|No|5|
+|`broker.timeoutMilliseconds`|How long (in milliseconds) to wait before timing 
out a segment metadata request|No|300000|
+
+### Reader Configs
+The properties used to configure the DataSourceReader when reading data from 
Druid in Spark.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`table`|The Druid data source to read from|Yes||
+|`reader.deepStorageType`|The type of deep storage used to back the target 
Druid cluster|No|`local`|
+|`reader.segments`|A hard-coded list of Druid segments to read. If set, the 
table and druid client configurations are ignored and the specified segments 
are read directly. Must be deserializable into Druid DataSegment instances|No|
+|`reader.useCompactSketches`|Controls whether or not compact representations 
of complex metrics are used (only for metrics that support compact 
forms)|No|False|

Review comment:
       Are there complex metrics that have compact forms and _aren't_ sketches? 
Either way, at the moment this property is only applied to sketches (see 
`ComplexMetricRegistry`)

##########
File path: 
spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}
+import org.apache.druid.spark.configuration.{Configuration, 
DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
InputPartition,
+  SupportsPushDownFilters, SupportsPushDownRequiredColumns, 
SupportsScanColumnarBatch}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.joda.time.Interval
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters.{asScalaBufferConverter, 
seqAsJavaListConverter}
+
+/**
+  * A DruidDataSourceReader handles the actual work of reading data from 
Druid. It does this by querying to determine
+  * where Druid segments live in deep storage and then reading those segments 
into memory in order to avoid straining
+  * the Druid cluster. In general, users should not directly instantiate 
instances of this class but instead use
+  * sparkSession.read.format("druid").options(Map(...)).load(). If the schema 
of the data in Druid is known, overhead
+  * can be further reduced by providing it directly (e.g. 
sparkSession.read.format("druid").schema(schema).options...)
+  *
+  * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.
+  *
+  * @param schema
+  * @param conf
+  */
+class DruidDataSourceReader(
+                             var schema: Option[StructType] = None,
+                             conf: Configuration
+                           ) extends DataSourceReader
+  with SupportsPushDownRequiredColumns with SupportsPushDownFilters with 
SupportsScanColumnarBatch with Logging {
+  private lazy val metadataClient =
+    DruidDataSourceReader.createDruidMetaDataClient(conf)
+  private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)
+
+  private var filters: Array[Filter] = Array.empty
+  private var druidColumnTypes: Option[Set[String]] = Option.empty
+
+  override def readSchema(): StructType = {
+    if (schema.isDefined) {
+      schema.get
+    } else {
+      require(conf.isPresent(DruidConfigurationKeys.tableKey),
+        s"Must set ${DruidConfigurationKeys.tableKey}!")
+      // TODO: Optionally accept a granularity so that if lowerBound to 
upperBound spans more than
+      //  twice the granularity duration, we can send a list with two disjoint 
intervals and
+      //  minimize the load on the broker from having to merge large numbers 
of segments
+      val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)
+      val columnMap = druidClient.getSchema(
+        conf.getString(DruidConfigurationKeys.tableKey),
+        Some(List[Interval](Intervals.utc(
+          lowerBound.getOrElse(JodaUtils.MIN_INSTANT),
+          upperBound.getOrElse(JodaUtils.MAX_INSTANT)
+        )))
+      )
+      schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))
+      druidColumnTypes = Option(columnMap.map(_._2._1).toSet)
+      schema.get
+    }
+  }
+
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+    // For now, one partition for each Druid segment partition
+    // Future improvements can use information from SegmentAnalyzer results to 
do smart things
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = 
readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = 
readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = 
readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    }
+  }
+
+  override def pruneColumns(structType: StructType): Unit = {
+    schema = Option(structType)
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    readSchema()
+    filters.partition(FilterUtils.isSupportedFilter(_, schema.get)) match {
+      case (supported, unsupported) =>
+        this.filters = supported
+        unsupported
+    }
+  }
+
+  override def pushedFilters(): Array[Filter] = filters
+
+  private[v2] def getSegments: Seq[DataSegment] = {
+    require(conf.isPresent(DruidConfigurationKeys.tableKey),
+      s"Must set ${DruidConfigurationKeys.tableKey}!")
+
+    // Check filters for any bounds on __time
+    // Otherwise, we'd need to full scan the segments table
+    val (lowerTimeBound, upperTimeBound) = 
FilterUtils.getTimeFilterBounds(filters)
+
+    
metadataClient.getSegmentPayloads(conf.getString(DruidConfigurationKeys.tableKey),

Review comment:
       Ah, I dropped the overshadow checking logic while chunking up PRs. 
Thanks for catching this. I've moved the logic to the metadata client because I 
think it makes more sense there now and addressed a TODO and added support for 
querying incomplete partitions if desired.

##########
File path: docs/operations/spark.md
##########
@@ -0,0 +1,279 @@
+---
+id: spark
+title: "Apache Spark Reader and Writer"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Spark Reader and Writer for Druid
+
+## Reader
+The reader reads Druid segments from deep storage into Spark. It locates the 
segments to read and determines their
+schema if not provided by querying the brokers for the relevant metadata but 
otherwise does not interact with a running
+Druid cluster.
+
+Sample Code:
+```scala
+import org.apache.druid.spark.DruidDataFrameReader
+
+val deepStorageConfig = new 
LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
+
+sparkSession
+  .read
+  .brokerHost("localhost")
+  .brokerPort(8082)
+  .metadataDbType("mysql")
+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
+  .metadataUser("druid")
+  .metadataPassword("diurd")
+  .dataSource("dataSource")
+  .deepStorage(deepStorageConfig)
+  .druid()
+```
+
+Alternatively, the reader can be configured via a properties map with no 
additional import needed:
+```scala
+val properties = Map[String, String](
+  "metadata.dbType" -> "mysql",
+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
+  "metadata.user" -> "druid",
+  "metadata.password" -> "diurd",
+  "broker.host" -> "localhost",
+  "broker.port" -> 8082,
+  "table" -> "dataSource",
+  "reader.deepStorageType" -> "local",
+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"
+)
+
+sparkSession
+  .read
+  .format("druid")
+  .options(properties)
+  .load()
+```
+
+If you know the schema of the Druid data source you're reading from, you can 
save needing to determine the schema via
+calls to the broker with
+```scala
+sparkSession
+  .read
+  .format("druid")
+  .schema(schema)
+  .options(properties)
+  .load()
+```
+
+Filters should be applied to the read-in data frame before any [Spark 
actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
+are triggered, to allow predicates to be pushed down to the reader and avoid 
full scans of the underlying Druid data.
+
+## Plugin Registries and Druid Extension Support
+One of Druid's strengths is its extensibility. Since these Spark readers and 
writers will not execute on a Druid cluster
+and won't have the ability to dynamically load classes or integrate with 
Druid's Guice injectors, Druid extensions can't
+be used directly. Instead, these connectors use a plugin registry 
architecture, including default plugins that support
+most functionality in `extensions-core`. Custom plugins consisting of a string 
name and one or more serializable
+generator functions must be registered before the first Spark action which 
would depend on them is called.
+
+### ComplexMetricRegistry
+The `ComplexMetricRegistry` provides support for serializing and deserializing 
complex metric types between Spark and
+Druid. Support for complex metric types in Druid core extensions is provided 
out of the box.
+
+Users wishing to override the default behavior or who need to add support for 
additional complex metric types can
+use the `ComplexMetricRegistry.register` functions to associate serde 
functions with a given complex metric type. The
+name used to register custom behavior must match the complex metric type name 
reported by Druid.
+**Note that custom plugins must be registered with both the executors and the 
Spark driver.**
+
+### SegmentReaderRegistry
+The `SegmentReaderRegistry` provides support for reading segments from deep 
storage. Local, HDFS, GCS, S3, and Azure
+Storage deep storage implementations are supported by default.
+
+Users wishing to override the default behavior or who need to add support for 
additional deep storage implementations
+can use either `SegmentReaderRegistry.registerInitializer` (to provide any 
necessary Jackson configuration for
+deserializing a `LoadSpec` object from a segment load spec) or 
`SegmentReaderRegistry.registerLoadFunction` (to register
+a function for creating a URI from a segment load spec). These two functions 
correspond to the first and second approach
+[outlined below](#deep-storage). **Note that custom plugins must be registered 
on the executors, not the Spark driver.**
+
+### SQLConnectorRegistry
+The `SQLConnectorRegistry` provides support for configuring connectors to 
Druid metadata databases. Support for MySQL,
+PostgreSQL, and Derby databases are provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for 
additional metadata database
+implementations can use the `SQLConnectorRegistry.register` function. Custom 
connectors should be registered on the
+driver.
+
+## Deploying to a Spark cluster
+This extension can be run on a Spark cluster in one of two ways: bundled as 
part of an application jar or uploaded as
+a library jar to a Spark cluster and included in the classpath provided to 
Spark applications by the application
+manager. If the second approach is used, this extension should be built with
+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`
+uploaded to the Spark cluster. Application jars should then be built with a 
compile-time dependency on
+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with 
`compileOnly` in Gradle).
+
+## Configuration Reference
+
+### Metadata Client Configs
+The properties used to configure the client that interacts with the Druid 
metadata server directly. Used by both reader
+and the writer. The `metadataPassword` property can either be provided as a 
string that will be used as-is or can be

Review comment:
       Ah nice catch, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to