This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 47bd817ec [SEDONA-632] Use direct committer when writing raster files
using `df.write.format("raster")` (#1528)
47bd817ec is described below
commit 47bd817ecba31347d8dac354e26085ad12f30d58
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Fri Jul 26 00:44:50 2024 +0800
[SEDONA-632] Use direct committer when writing raster files using
`df.write.format("raster")` (#1528)
* Use direct committer when writing raster files using
df.write.format("raster")
* Make it work with PathOutputCommitProtocol, which is the commit protocol
used by S3 magic committer
---
docs/api/sql/Raster-writer.md | 3 +
.../sedona_sql/io/raster/RasterFileFormat.scala | 69 +++++++++++++++++++++-
.../sql/sedona_sql/io/raster/RasterOptions.scala | 2 +
.../scala/org/apache/sedona/sql/rasterIOTest.scala | 1 +
4 files changed, 74 insertions(+), 1 deletion(-)
diff --git a/docs/api/sql/Raster-writer.md b/docs/api/sql/Raster-writer.md
index f0314eefa..8abc74465 100644
--- a/docs/api/sql/Raster-writer.md
+++ b/docs/api/sql/Raster-writer.md
@@ -156,6 +156,9 @@ Available options:
* pathField
* No default value. If you use this option, then the column specified
in this option must exist in the DataFrame schema. If this option is not used,
each produced raster image will have a random UUID file name.
* Allowed values: any column name that indicates the paths of each
raster file
+* useDirectCommitter (Since: `v1.6.1`)
+ * Default value: `true`. If set to `true`, the output files will be
written directly to the target location. If set to `false`, the output files
will be written to a temporary location and finally be committed to their
target location. It is usually slower to write large amount of raster files
with `useDirectCommitter` set to `false`, especially when writing to object
stores such as S3.
+ * Allowed values: `true` or `false`
The schema of the Raster dataframe to be written can be one of the following
two schemas:
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
index d47594ea3..2d1b28f58 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
@@ -18,8 +18,13 @@
*/
package org.apache.spark.sql.sedona_sql.io.raster
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.OutputCommitter
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter,
OutputWriterFactory}
@@ -49,6 +54,19 @@ private[spark] class RasterFileFormat extends FileFormat
with DataSourceRegister
if (!isValidRasterSchema(dataSchema)) {
throw new IllegalArgumentException("Invalid Raster DataFrame Schema")
}
+ if (rasterOptions.useDirectCommitter) {
+ val conf = job.getConfiguration
+
+ // For working with SQLHadoopMapReduceCommitProtocol, which is the
default output commit protocol
+ conf.set("spark.sql.sources.outputCommitterClass",
classOf[DirectOutputCommitter].getName)
+
+ // For working with
org.apache.spark.internal.io.cloud.PathOutputCommitProtocol, which is the
+ // output commit protocol used in cloud storage. For instance, the S3
magic committer uses
+ // this output commit protocol
+ conf.set(
+ "mapreduce.outputcommitter.factory.class",
+ classOf[DirectOutputCommitterFactory].getName)
+ }
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = ""
@@ -144,3 +162,52 @@ private class RasterFileWriter(
rasterFilePath + rasterOptions.fileExtension
}
}
+
+class DirectOutputCommitterFactory extends PathOutputCommitterFactory {
+ override def createOutputCommitter(
+ outputPath: Path,
+ context: TaskAttemptContext): PathOutputCommitter =
+ new DirectPathOutputCommitter(outputPath, context)
+}
+
+trait DirectOutputCommitterTrait extends OutputCommitter {
+ override def setupJob(jobContext: JobContext): Unit = {
+ val outputPath = FileOutputFormat.getOutputPath(jobContext)
+ if (outputPath != null) {
+ val fs = outputPath.getFileSystem(jobContext.getConfiguration)
+ if (!fs.exists(outputPath)) {
+ fs.mkdirs(outputPath)
+ }
+ }
+ }
+
+ override def commitJob(jobContext: JobContext): Unit = {
+ val outputPath = FileOutputFormat.getOutputPath(jobContext)
+ if (outputPath != null) {
+ val fs = outputPath.getFileSystem(jobContext.getConfiguration)
+ // True if the job requires output.dir marked on successful job.
+ // Note that by default it is set to true.
+ if (jobContext.getConfiguration.getBoolean(
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ true)) {
+ val markerPath = new Path(outputPath, "_SUCCESS")
+ fs.create(markerPath).close()
+ }
+ }
+ }
+
+ override def setupTask(taskContext: TaskAttemptContext): Unit = ()
+ override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean =
false
+ override def commitTask(taskContext: TaskAttemptContext): Unit = ()
+ override def abortTask(taskContext: TaskAttemptContext): Unit = ()
+}
+
+class DirectOutputCommitter extends DirectOutputCommitterTrait
+
+class DirectPathOutputCommitter(outputPath: Path, context: JobContext)
+ extends PathOutputCommitter(outputPath, context)
+ with DirectOutputCommitterTrait {
+
+ override def getOutputPath: Path = outputPath
+ override def getWorkPath: Path = outputPath
+}
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterOptions.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterOptions.scala
index c86655646..4f1e501f0 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterOptions.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterOptions.scala
@@ -30,4 +30,6 @@ private[io] class RasterOptions(@transient private val
parameters: CaseInsensiti
val rasterPathField = parameters.get("pathField")
// Column of the raster image itself
val rasterField = parameters.get("rasterField")
+ // Use direct committer to directly write to the final destination
+ val useDirectCommitter = parameters.getOrElse("useDirectCommitter",
"true").toBoolean
}
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
index c91e1ce6d..d256d6a31 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -51,6 +51,7 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter
with GivenWhenThen
.option("rasterField", "content")
.option("fileExtension", ".tiff")
.option("pathField", "path")
+ .option("useDirectCommitter", "false")
.mode(SaveMode.Overwrite)
.save(tempDir + "/raster-written")
rasterDf = sparkSession.read.format("binaryFile").load(tempDir +
"/raster-written/*")