JulianJaffePinterest opened a new pull request, #12159:
URL: https://github.com/apache/druid/pull/12159

   Add code, utilities, tests, and documentation for writing data to Druid 
using Spark.
   
   This is the final piecemeal PR recapitulating #10920 (see #11474 and #11823 
for the previous two).
   
   ### Description
   
   This connector writes Druid segment files to deep storage, one per input 
Spark partition. The connector also writes the segment descriptions to the 
Druid metadata SQL server, at which point the normal Druid coordination process 
handles loading, atomic updating,  and overshadowing.
   
   #### Writing Segments
   
   Each Spark partition is assigned to an executor. For each partition, one or 
more `IncrementalIndex` is created and passed the rows in the source Spark 
partition for the index's interval. The indices are flushed to local storage 
while being built before being pushed to deep storage. Internally, this writer 
uses a two-phase commit model to push segments: if all Spark partitions are 
successfully written to Druid segment files on deep storage, the segment 
metadata is published and the Druid segment coordination process takes over. 
However, if one or more partitions fail, the write is aborted, the segments 
already written to deep storage are deleted, and control is passed back to 
Spark for error handling.
   
   ##### Key Classes
   
   `DruidDataSourceWriter`
   `DruidWriterFactory`
   `DruidDataWriter`
   
   #### Partitioning
   
   Spark's architecture means that the source dataframe is already partitioned 
by the time this writer is called. By default, this writer will attempt to 
"rationalize" input segments such that all covered time intervals are 
contiguous and complete. This means that Druid can atomically load and 
overshadow the produced segments but doesn't prevent sub-optimal partitioning 
from passing through from Spark to Druid. To address this issue, callers can 
partition their dataframes themselves and pass the writer a partition map 
describing the partitioning scheme. The writer provides four partitioners out 
of the box that callers can use as well (see the 
[docs](docs/operations/spark.md#provided-partitioners) for more information).
   
   ##### Key Classes
   
   `SegmentRationalizer`
   `org.apache.druid.spark.partitioners.*`
   
   #### User Interface
   
   Users call this writer like any other Spark writer: they call `.write()` on 
a DataFrame, configure their writer if desired, and then call `.save()`. Like 
the reader introduced in #11823, there is also a more ergonomic and type-safe 
wrapper if desired. As always, the documentation contains a complete 
configuration reference with further examples.
   
   Example configuration:
   
   ```scala
   val metadataProperties = Map[String, String](
     "metadata.dbType" -> "mysql",
     "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
     "metadata.user" -> "druid",
     "metadata.password" -> "diurd"
   )
   
   val writerConfigs = Map[String, String] (
     "table" -> "dataSource",
     "writer.version" -> 1,
     "writer.deepStorageType" -> "local",
     "writer.storageDirectory" -> "/mnt/druid/druid-segments/"
   )
   
   df
     .write
     .format("druid")
     .mode(SaveMode.Overwrite)
     .options(Map[String, String](writerConfigs ++ metadataProperties))
     .save()
   ```
   
   or using the convenience wrapper:
   
   ```scala
   import org.apache.druid.spark.DruidDataFrameWriter
   
   val deepStorageConfig = new 
LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
   
   df
     .write
     .metadataDbType("mysql")
     .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
     .metadataUser("druid")
     .metadataPassword("diurd")
     .version(1)
     .deepStorage(deepStorageConfig)
     .mode(SaveMode.Overwrite)
     .dataSource("dataSource")
     .druid()
   ```
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
   - [x] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [x] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [x] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to