JulianJaffePinterest opened a new pull request, #12159:
URL: https://github.com/apache/druid/pull/12159
Add code, utilities, tests, and documentation for writing data to Druid
using Spark.
This is the final piecemeal PR recapitulating #10920 (see #11474 and #11823
for the previous two).
### Description
This connector writes Druid segment files to deep storage, one per input
Spark partition. The connector also writes the segment descriptions to the
Druid metadata SQL server, at which point the normal Druid coordination process
handles loading, atomic updating, and overshadowing.
#### Writing Segments
Each Spark partition is assigned to an executor. For each partition, one or
more `IncrementalIndex` is created and passed the rows in the source Spark
partition for the index's interval. The indices are flushed to local storage
while being built before being pushed to deep storage. Internally, this writer
uses a two-phase commit model to push segments: if all Spark partitions are
successfully written to Druid segment files on deep storage, the segment
metadata is published and the Druid segment coordination process takes over.
However, if one or more partitions fail, the write is aborted, the segments
already written to deep storage are deleted, and control is passed back to
Spark for error handling.
##### Key Classes
`DruidDataSourceWriter`
`DruidWriterFactory`
`DruidDataWriter`
#### Partitioning
Spark's architecture means that the source dataframe is already partitioned
by the time this writer is called. By default, this writer will attempt to
"rationalize" input segments such that all covered time intervals are
contiguous and complete. This means that Druid can atomically load and
overshadow the produced segments but doesn't prevent sub-optimal partitioning
from passing through from Spark to Druid. To address this issue, callers can
partition their dataframes themselves and pass the writer a partition map
describing the partitioning scheme. The writer provides four partitioners out
of the box that callers can use as well (see the
[docs](docs/operations/spark.md#provided-partitioners) for more information).
##### Key Classes
`SegmentRationalizer`
`org.apache.druid.spark.partitioners.*`
#### User Interface
Users call this writer like any other Spark writer: they call `.write()` on
a DataFrame, configure their writer if desired, and then call `.save()`. Like
the reader introduced in #11823, there is also a more ergonomic and type-safe
wrapper if desired. As always, the documentation contains a complete
configuration reference with further examples.
Example configuration:
```scala
val metadataProperties = Map[String, String](
"metadata.dbType" -> "mysql",
"metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
"metadata.user" -> "druid",
"metadata.password" -> "diurd"
)
val writerConfigs = Map[String, String] (
"table" -> "dataSource",
"writer.version" -> 1,
"writer.deepStorageType" -> "local",
"writer.storageDirectory" -> "/mnt/druid/druid-segments/"
)
df
.write
.format("druid")
.mode(SaveMode.Overwrite)
.options(Map[String, String](writerConfigs ++ metadataProperties))
.save()
```
or using the convenience wrapper:
```scala
import org.apache.druid.spark.DruidDataFrameWriter
val deepStorageConfig = new
LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
df
.write
.metadataDbType("mysql")
.metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
.metadataUser("druid")
.metadataPassword("diurd")
.version(1)
.deepStorage(deepStorageConfig)
.mode(SaveMode.Overwrite)
.dataSource("dataSource")
.druid()
```
<hr>
This PR has:
- [x] been self-reviewed.
- [x] added documentation for new or modified features or behaviors.
- [x] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [x] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [x] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [x] added integration tests.
- [x] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]