This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6a110e5e6 [GLUTEN-5414] [VL] Support datasource v2 scan csv (#5717)
6a110e5e6 is described below
commit 6a110e5e60d5f195293119f42a58c9f6911c987c
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu May 16 16:02:54 2024 +0800
[GLUTEN-5414] [VL] Support datasource v2 scan csv (#5717)
---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 2 +
.../gluten/datasource/ArrowCSVFileFormat.scala | 277 +++++++++++----------
.../gluten/datasource/ArrowConvertorRule.scala | 49 +++-
.../v2/ArrowCSVPartitionReaderFactory.scala | 144 +++++++++++
.../apache/gluten/datasource/v2/ArrowCSVScan.scala | 76 ++++++
.../v2/ArrowCSVScanBuilder.scala} | 34 ++-
.../gluten/datasource/v2/ArrowCSVTable.scala | 68 +++++
.../datasource/v2/ArrowBatchScanExec.scala | 48 ++++
.../gluten/extension/ArrowScanReplaceRule.scala | 7 +-
.../org/apache/gluten/execution/TestOperator.scala | 22 +-
.../gluten/columnarbatch/ColumnarBatches.java | 4 +-
.../scala/org/apache/gluten/utils/ArrowUtil.scala | 35 +--
.../gluten/utils/velox/VeloxTestSettings.scala | 9 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 9 +
.../gluten/utils/velox/VeloxTestSettings.scala | 9 +
.../gluten/utils/velox/VeloxTestSettings.scala | 9 +
.../execution/datasources/csv/GlutenCSVSuite.scala | 1 +
.../datasources/v2/BatchScanExecShim.scala | 4 +
.../datasources/v2/BatchScanExecShim.scala | 6 +
.../datasources/v2/BatchScanExecShim.scala | 8 +
.../datasources/v2/BatchScanExecShim.scala | 8 +
21 files changed, 658 insertions(+), 171 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 33ce1ee72..f54bf9b3f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.backendsapi.SparkPlanExecApi
import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
+import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
import org.apache.gluten.expression._
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.expression.aggregate.{HLLAdapter,
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
@@ -869,6 +870,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
override def outputNativeColumnarSparkCompatibleData(plan: SparkPlan):
Boolean = plan match {
case _: ArrowFileSourceScanExec => true
+ case _: ArrowBatchScanExec => true
case _ => false
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
index c05af24ff..0f6813d8f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.SchemaMismatchException
import org.apache.gluten.execution.RowToVeloxColumnarExec
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
import org.apache.gluten.utils.{ArrowUtil, Iterators}
import org.apache.gluten.vectorized.ArrowWritableColumnVector
@@ -41,6 +42,7 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
import org.apache.arrow.dataset.scanner.ScanOptions
+import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorUnloader
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.hadoop.conf.Configuration
@@ -66,55 +68,127 @@ class ArrowCSVFileFormat extends FileFormat with
DataSourceRegister with Logging
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
- ArrowUtil.readSchema(files, fileFormat)
+ ArrowUtil.readSchema(
+ files,
+ fileFormat,
+ ArrowBufferAllocators.contextInstance(),
+ ArrowNativeMemoryPool.arrowPool("infer schema"))
}
override def supportBatch(sparkSession: SparkSession, dataSchema:
StructType): Boolean = true
- private def checkHeader(
- file: PartitionedFile,
+ override def buildReaderWithPartitionValues(
+ sparkSession: SparkSession,
dataSchema: StructType,
+ partitionSchema: StructType,
requiredSchema: StructType,
- parsedOptions: CSVOptions,
- actualFilters: Seq[Filter],
- conf: Configuration): Unit = {
- val isStartOfFile = file.start == 0
- if (!isStartOfFile) {
- return
- }
- val actualDataSchema = StructType(
- dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
- val actualRequiredSchema = StructType(
- requiredSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
- val parser =
- new UnivocityParser(actualDataSchema, actualRequiredSchema,
parsedOptions, actualFilters)
- val schema = if (parsedOptions.columnPruning) actualRequiredSchema else
actualDataSchema
- val headerChecker = new CSVHeaderChecker(
- schema,
- parsedOptions,
- source = s"CSV file: ${file.filePath}",
- isStartOfFile)
-
- val lines = {
- val linesReader =
- new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead,
conf)
- Option(TaskContext.get())
- .foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
- linesReader.map {
- line => new String(line.getBytes, 0, line.getLength,
parser.options.charset)
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val sqlConf = sparkSession.sessionState.conf
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+ val batchSize = sqlConf.columnBatchSize
+ val caseSensitive = sqlConf.caseSensitiveAnalysis
+ val columnPruning = sqlConf.csvColumnPruning &&
+ !requiredSchema.exists(_.name ==
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ val parsedOptions = new CSVOptions(
+ options,
+ columnPruning,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ val actualFilters =
+
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
+ (file: PartitionedFile) => {
+ ArrowCSVFileFormat.checkHeader(
+ file,
+ dataSchema,
+ requiredSchema,
+ parsedOptions,
+ actualFilters,
+ broadcastedHadoopConf.value.value)
+ val factory =
+ ArrowUtil.makeArrowDiscovery(
+ URLDecoder.decode(file.filePath.toString, "UTF-8"),
+ fileFormat,
+ ArrowBufferAllocators.contextInstance(),
+ ArrowNativeMemoryPool.arrowPool("FileSystemDatasetFactory")
+ )
+ // todo predicate validation / pushdown
+ val fileFields = factory.inspect().getFields.asScala
+ // TODO: support array/map/struct types in out-of-order schema reading.
+ try {
+ val actualReadFields =
+ ArrowUtil.getRequestedField(requiredSchema, fileFields,
caseSensitive)
+ ArrowCSVFileFormat
+ .readArrow(
+ ArrowBufferAllocators.contextInstance(),
+ file,
+ actualReadFields,
+ caseSensitive,
+ requiredSchema,
+ partitionSchema,
+ factory,
+ batchSize)
+ .asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case e: SchemaMismatchException =>
+ logWarning(e.getMessage)
+ val iter = ArrowCSVFileFormat.fallbackReadVanilla(
+ dataSchema,
+ requiredSchema,
+ broadcastedHadoopConf.value.value,
+ parsedOptions,
+ file,
+ actualFilters,
+ columnPruning)
+ val (schema, rows) =
+ ArrowCSVFileFormat.withPartitionValue(requiredSchema,
partitionSchema, iter, file)
+ ArrowCSVFileFormat
+ .rowToColumn(schema, batchSize, rows)
+ .asInstanceOf[Iterator[InternalRow]]
+ case d: Exception => throw d
}
+
}
- CSVHeaderCheckerHelper.checkHeaderColumnNames(headerChecker, lines,
parser.tokenizer)
}
- private def readArrow(
+ override def vectorTypes(
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ sqlConf: SQLConf): Option[Seq[String]] = {
+ Option(
+ Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
+ classOf[ArrowWritableColumnVector].getName
+ ))
+ }
+
+ override def shortName(): String = "arrowcsv"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean =
other.isInstanceOf[ArrowCSVFileFormat]
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: _root_.org.apache.hadoop.mapreduce.Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ throw new UnsupportedOperationException()
+ }
+}
+
+object ArrowCSVFileFormat {
+
+ def readArrow(
+ allocator: BufferAllocator,
file: PartitionedFile,
actualReadFields: Schema,
caseSensitive: Boolean,
requiredSchema: StructType,
partitionSchema: StructType,
factory: FileSystemDatasetFactory,
- batchSize: Int): Iterator[InternalRow] = {
+ batchSize: Int): Iterator[ColumnarBatch] = {
val compare = ArrowUtil.compareStringFunc(caseSensitive)
val actualReadFieldNames =
actualReadFields.getFields.asScala.map(_.getName).toArray
val actualReadSchema = new StructType(
@@ -147,7 +221,9 @@ class ArrowCSVFileFormat extends FileFormat with
DataSourceRegister with Logging
override def next: ColumnarBatch = {
val root = reader.getVectorSchemaRoot
val unloader = new VectorUnloader(root)
+
val batch = ArrowUtil.loadBatch(
+ allocator,
unloader.getRecordBatch,
actualReadSchema,
requiredSchema,
@@ -166,13 +242,48 @@ class ArrowCSVFileFormat extends FileFormat with
DataSourceRegister with Logging
}
.recyclePayload(_.close())
.create()
- .asInstanceOf[Iterator[InternalRow]]
}
- private def rowToColumn(
+ def checkHeader(
+ file: PartitionedFile,
+ dataSchema: StructType,
+ requiredSchema: StructType,
+ parsedOptions: CSVOptions,
+ actualFilters: Seq[Filter],
+ conf: Configuration): Unit = {
+ val isStartOfFile = file.start == 0
+ if (!isStartOfFile) {
+ return
+ }
+ val actualDataSchema = StructType(
+ dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
+ val actualRequiredSchema = StructType(
+ requiredSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
+ val parser =
+ new UnivocityParser(actualDataSchema, actualRequiredSchema,
parsedOptions, actualFilters)
+ val schema = if (parsedOptions.columnPruning) actualRequiredSchema else
actualDataSchema
+ val headerChecker = new CSVHeaderChecker(
+ schema,
+ parsedOptions,
+ source = s"CSV file: ${file.filePath}",
+ isStartOfFile)
+
+ val lines = {
+ val linesReader =
+ new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead,
conf)
+ Option(TaskContext.get())
+ .foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
+ linesReader.map {
+ line => new String(line.getBytes, 0, line.getLength,
parser.options.charset)
+ }
+ }
+ CSVHeaderCheckerHelper.checkHeaderColumnNames(headerChecker, lines,
parser.tokenizer)
+ }
+
+ def rowToColumn(
schema: StructType,
batchSize: Int,
- it: Iterator[InternalRow]): Iterator[InternalRow] = {
+ it: Iterator[InternalRow]): Iterator[ColumnarBatch] = {
// note, these metrics are unused but just make `RowToVeloxColumnarExec`
happy
val numInputRows = new SQLMetric("numInputRows")
val numOutputBatches = new SQLMetric("numOutputBatches")
@@ -187,7 +298,6 @@ class ArrowCSVFileFormat extends FileFormat with
DataSourceRegister with Logging
)
veloxBatch
.map(v =>
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), v))
- .asInstanceOf[Iterator[InternalRow]]
}
private def toAttribute(field: StructField): AttributeReference =
@@ -197,7 +307,7 @@ class ArrowCSVFileFormat extends FileFormat with
DataSourceRegister with Logging
schema.map(toAttribute)
}
- private def withPartitionValue(
+ def withPartitionValue(
requiredSchema: StructType,
partitionSchema: StructType,
iter: Iterator[InternalRow],
@@ -223,7 +333,7 @@ class ArrowCSVFileFormat extends FileFormat with
DataSourceRegister with Logging
}
}
- private def fallbackReadVanilla(
+ def fallbackReadVanilla(
dataSchema: StructType,
requiredSchema: StructType,
conf: Configuration,
@@ -246,93 +356,4 @@ class ArrowCSVFileFormat extends FileFormat with
DataSourceRegister with Logging
isStartOfFile)
CSVDataSource(parsedOptions).readFile(conf, file, parser, headerChecker,
requiredSchema)
}
-
- override def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
- val sqlConf = sparkSession.sessionState.conf
- val broadcastedHadoopConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- val batchSize = sqlConf.columnBatchSize
- val caseSensitive = sqlConf.caseSensitiveAnalysis
- val columnPruning = sqlConf.csvColumnPruning &&
- !requiredSchema.exists(_.name ==
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
- val parsedOptions = new CSVOptions(
- options,
- columnPruning,
- sparkSession.sessionState.conf.sessionLocalTimeZone,
- sparkSession.sessionState.conf.columnNameOfCorruptRecord)
- val actualFilters =
-
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
- (file: PartitionedFile) => {
- checkHeader(
- file,
- dataSchema,
- requiredSchema,
- parsedOptions,
- actualFilters,
- broadcastedHadoopConf.value.value)
- val factory =
- ArrowUtil.makeArrowDiscovery(URLDecoder.decode(file.filePath.toString,
"UTF-8"), fileFormat)
- // todo predicate validation / pushdown
- val fileFields = factory.inspect().getFields.asScala
- // TODO: support array/map/struct types in out-of-order schema reading.
- try {
- val actualReadFields =
- ArrowUtil.getRequestedField(requiredSchema, fileFields,
caseSensitive)
- readArrow(
- file,
- actualReadFields,
- caseSensitive,
- requiredSchema,
- partitionSchema,
- factory,
- batchSize)
- } catch {
- case e: SchemaMismatchException =>
- logWarning(e.getMessage)
- val iter = fallbackReadVanilla(
- dataSchema,
- requiredSchema,
- broadcastedHadoopConf.value.value,
- parsedOptions,
- file,
- actualFilters,
- columnPruning)
- val (schema, rows) = withPartitionValue(requiredSchema,
partitionSchema, iter, file)
- rowToColumn(schema, batchSize, rows)
- case d: Exception => throw d
- }
-
- }
- }
-
- override def vectorTypes(
- requiredSchema: StructType,
- partitionSchema: StructType,
- sqlConf: SQLConf): Option[Seq[String]] = {
- Option(
- Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
- classOf[ArrowWritableColumnVector].getName
- ))
- }
-
- override def shortName(): String = "arrowcsv"
-
- override def hashCode(): Int = getClass.hashCode()
-
- override def equals(other: Any): Boolean =
other.isInstanceOf[ArrowCSVFileFormat]
-
- override def prepareWrite(
- sparkSession: SparkSession,
- job: _root_.org.apache.hadoop.mapreduce.Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = {
- throw new UnsupportedOperationException()
- }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
index e29313a38..dab1ffd3b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.datasource
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.datasource.v2.ArrowCSVTable
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.annotation.Experimental
@@ -27,11 +28,15 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.PermissiveMode
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkSchemaUtil
import java.nio.charset.StandardCharsets
+import scala.collection.convert.ImplicitConversions.`map AsScala`
+
@Experimental
case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan]
{
override def apply(plan: LogicalPlan): LogicalPlan = {
@@ -39,27 +44,49 @@ case class ArrowConvertorRule(session: SparkSession)
extends Rule[LogicalPlan] {
return plan
}
plan.resolveOperators {
- // Read path
case l @ LogicalRelation(
r @ HadoopFsRelation(_, _, dataSchema, _, _: CSVFileFormat,
options),
_,
_,
- _) =>
- val csvOptions = new CSVOptions(
+ _) if validate(session, dataSchema, options) =>
+ l.copy(relation = r.copy(fileFormat = new
ArrowCSVFileFormat())(session))
+ case d @ DataSourceV2Relation(
+ t @ CSVTable(
+ name,
+ sparkSession,
+ options,
+ paths,
+ userSpecifiedSchema,
+ fallbackFileFormat),
+ _,
+ _,
+ _,
+ _) if validate(session, t.dataSchema,
options.asCaseSensitiveMap().toMap) =>
+ d.copy(table = ArrowCSVTable(
+ "arrow" + name,
+ sparkSession,
options,
- columnPruning = session.sessionState.conf.csvColumnPruning,
- session.sessionState.conf.sessionLocalTimeZone)
- if (
- checkSchema(dataSchema) &&
- checkCsvOptions(csvOptions,
session.sessionState.conf.sessionLocalTimeZone)
- ) {
- l.copy(relation = r.copy(fileFormat = new
ArrowCSVFileFormat())(session))
- } else l
+ paths,
+ userSpecifiedSchema,
+ fallbackFileFormat))
case r =>
r
}
}
+ private def validate(
+ session: SparkSession,
+ dataSchema: StructType,
+ options: Map[String, String]): Boolean = {
+ val csvOptions = new CSVOptions(
+ options,
+ columnPruning = session.sessionState.conf.csvColumnPruning,
+ session.sessionState.conf.sessionLocalTimeZone)
+ checkSchema(dataSchema) &&
+ checkCsvOptions(csvOptions,
session.sessionState.conf.sessionLocalTimeZone) &&
+ dataSchema.nonEmpty
+ }
+
private def checkCsvOptions(csvOptions: CSVOptions, timeZone: String):
Boolean = {
csvOptions.headerFlag && !csvOptions.multiLine && csvOptions.delimiter ==
"," &&
csvOptions.quote == '\"' &&
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
new file mode 100644
index 000000000..ddc7f797f
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.datasource.v2
+
+import org.apache.gluten.datasource.ArrowCSVFileFormat
+import org.apache.gluten.exception.SchemaMismatchException
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
+import org.apache.gluten.utils.ArrowUtil
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.{SerializableConfiguration, TaskResources}
+
+import org.apache.arrow.dataset.file.FileFormat
+
+import java.net.URLDecoder
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+case class ArrowCSVPartitionReaderFactory(
+ sqlConf: SQLConf,
+ broadcastedConf: Broadcast[SerializableConfiguration],
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ readPartitionSchema: StructType,
+ options: CSVOptions,
+ filters: Seq[Filter])
+ extends FilePartitionReaderFactory
+ with Logging {
+
+ private val batchSize = sqlConf.parquetVectorizedReaderBatchSize
+ private val caseSensitive: Boolean = sqlConf.caseSensitiveAnalysis
+ private val csvColumnPruning: Boolean = sqlConf.csvColumnPruning
+
+ override def supportColumnarReads(partition: InputPartition): Boolean = true
+
+ override def buildReader(partitionedFile: PartitionedFile):
PartitionReader[InternalRow] = {
+ // disable row based read
+ throw new UnsupportedOperationException
+ }
+
+ override def buildColumnarReader(
+ partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = {
+ val actualDataSchema = StructType(
+ dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
+ val actualReadDataSchema = StructType(
+ readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
+ ArrowCSVFileFormat.checkHeader(
+ partitionedFile,
+ actualDataSchema,
+ actualReadDataSchema,
+ options,
+ filters,
+ broadcastedConf.value.value)
+ val (allocator, pool) = if (!TaskResources.inSparkTask()) {
+ TaskResources.runUnsafe(
+ (
+ ArrowBufferAllocators.contextInstance(),
+ ArrowNativeMemoryPool.arrowPool("FileSystemFactory"))
+ )
+ } else {
+ (
+ ArrowBufferAllocators.contextInstance(),
+ ArrowNativeMemoryPool.arrowPool("FileSystemFactory"))
+ }
+ val factory = ArrowUtil.makeArrowDiscovery(
+ URLDecoder.decode(partitionedFile.filePath.toString(), "UTF-8"),
+ FileFormat.CSV,
+ allocator,
+ pool)
+ val parquetFileFields = factory.inspect().getFields.asScala
+ // TODO: support array/map/struct types in out-of-order schema reading.
+ val iter =
+ try {
+ val actualReadFields =
+ ArrowUtil.getRequestedField(readDataSchema, parquetFileFields,
caseSensitive)
+ ArrowCSVFileFormat.readArrow(
+ allocator,
+ partitionedFile,
+ actualReadFields,
+ caseSensitive,
+ readDataSchema,
+ readPartitionSchema,
+ factory,
+ batchSize)
+ } catch {
+ case e: SchemaMismatchException =>
+ logWarning(e.getMessage)
+ val iter = ArrowCSVFileFormat.fallbackReadVanilla(
+ dataSchema,
+ readDataSchema,
+ broadcastedConf.value.value,
+ options,
+ partitionedFile,
+ filters,
+ csvColumnPruning)
+ val (schema, rows) = ArrowCSVFileFormat.withPartitionValue(
+ readDataSchema,
+ readPartitionSchema,
+ iter,
+ partitionedFile)
+ ArrowCSVFileFormat.rowToColumn(schema, batchSize, rows)
+ case d: Exception => throw d
+ }
+
+ new PartitionReader[ColumnarBatch] {
+
+ override def next(): Boolean = {
+ iter.hasNext
+ }
+
+ override def get(): ColumnarBatch = {
+ iter.next()
+ }
+
+ override def close(): Unit = {}
+ }
+ }
+
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
new file mode 100644
index 000000000..ce3f84770
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.datasource.v2
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connector.read.PartitionReaderFactory
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScan
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.hadoop.fs.Path
+
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
+case class ArrowCSVScan(
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ readPartitionSchema: StructType,
+ pushedFilters: Array[Filter],
+ options: CaseInsensitiveStringMap,
+ partitionFilters: Seq[Expression] = Seq.empty,
+ dataFilters: Seq[Expression] = Seq.empty)
+ extends FileScan {
+
+ private lazy val parsedOptions: CSVOptions = new CSVOptions(
+ options.asScala.toMap,
+ columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord
+ )
+
+ override def isSplitable(path: Path): Boolean = {
+ false
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ val caseSensitiveMap = options.asCaseSensitiveMap().asScala.toMap
+ val hconf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ val broadcastedConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hconf))
+ val actualFilters =
+
pushedFilters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
+ ArrowCSVPartitionReaderFactory(
+ sparkSession.sessionState.conf,
+ broadcastedConf,
+ dataSchema,
+ readDataSchema,
+ readPartitionSchema,
+ parsedOptions,
+ actualFilters)
+ }
+
+ def withFilters(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): FileScan =
+ this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
similarity index 51%
copy from
backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
copy to
backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
index 2b7c4b1da..2b3991fe2 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
@@ -14,21 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension
-
-import org.apache.gluten.datasource.ArrowCSVFileFormat
+package org.apache.gluten.datasource.v2
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{ArrowFileSourceScanExec,
FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
-case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan = {
- plan.transformUp {
- case plan: FileSourceScanExec if
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
- ArrowFileSourceScanExec(plan)
- case p => p
- }
+case class ArrowCSVScanBuilder(
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ schema: StructType,
+ dataSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ override def build(): Scan = {
+ ArrowCSVScan(
+ sparkSession,
+ fileIndex,
+ dataSchema,
+ readDataSchema(),
+ readPartitionSchema(),
+ Array.empty,
+ options)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
new file mode 100644
index 000000000..aa7f737f9
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.datasource.v2
+
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
+import org.apache.gluten.utils.ArrowUtil
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileTable
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.TaskResources
+
+import org.apache.hadoop.fs.FileStatus
+
+case class ArrowCSVTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat])
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+ override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
+ val (allocator, pool) = if (!TaskResources.inSparkTask()) {
+ TaskResources.runUnsafe(
+ (ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("inferSchema"))
+ )
+ } else {
+ (ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("inferSchema"))
+ }
+ ArrowUtil.readSchema(
+ files.head,
+ org.apache.arrow.dataset.file.FileFormat.CSV,
+ allocator,
+ pool
+ )
+ }
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
+ ArrowCSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ throw new UnsupportedOperationException
+ }
+
+ override def formatName: String = "arrowcsv"
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
new file mode 100644
index 000000000..3c1c53820
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.datasource.v2
+
+import org.apache.gluten.extension.GlutenPlan
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.connector.read.{Batch, PartitionReaderFactory,
Scan}
+import org.apache.spark.sql.execution.datasources.v2.{ArrowBatchScanExecShim,
BatchScanExec}
+
+case class ArrowBatchScanExec(original: BatchScanExec)
+ extends ArrowBatchScanExecShim(original)
+ with GlutenPlan {
+
+ @transient lazy val batch: Batch = original.batch
+
+ override lazy val readerFactory: PartitionReaderFactory =
original.readerFactory
+
+ override lazy val inputRDD: RDD[InternalRow] = original.inputRDD
+
+ override def outputPartitioning: Partitioning = original.outputPartitioning
+
+ override def scan: Scan = original.scan
+
+ override def doCanonicalize(): ArrowBatchScanExec =
+ this.copy(original = original.doCanonicalize())
+
+ override def nodeName: String = "Arrow" + original.nodeName
+
+ override def output: Seq[Attribute] = original.output
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
index 2b7c4b1da..adfc6ca74 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
@@ -17,18 +17,23 @@
package org.apache.gluten.extension
import org.apache.gluten.datasource.ArrowCSVFileFormat
+import org.apache.gluten.datasource.v2.ArrowCSVScan
+import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ArrowFileSourceScanExec,
FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case plan: FileSourceScanExec if
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
ArrowFileSourceScanExec(plan)
+ case plan: BatchScanExec if plan.scan.isInstanceOf[ArrowCSVScan] =>
+ ArrowBatchScanExec(plan)
+ case plan: BatchScanExec => plan
case p => p
}
-
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index bccb06a13..0872ac798 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.datasource.ArrowCSVFileFormat
+import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
@@ -491,7 +492,6 @@ class TestOperator extends VeloxWholeStageTransformerSuite {
runQueryAndCompare("select * from student") {
df =>
val plan = df.queryExecution.executedPlan
- print(plan)
assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
assert(plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).isDefined)
val scan =
plan.find(_.isInstanceOf[ArrowFileSourceScanExec]).toList.head
@@ -538,6 +538,26 @@ class TestOperator extends VeloxWholeStageTransformerSuite
{
}
}
+ test("csv scan datasource v2") {
+ withSQLConf("spark.sql.sources.useV1SourceList" -> "") {
+ val filePath = rootPath + "/datasource/csv/student.csv"
+ val df = spark.read
+ .format("csv")
+ .option("header", "true")
+ .load(filePath)
+ df.createOrReplaceTempView("student")
+ runQueryAndCompare("select * from student") {
+ checkGlutenOperatorMatch[ArrowBatchScanExec]
+ }
+ runQueryAndCompare("select * from student where Name = 'Peter'") {
+ df =>
+ val plan = df.queryExecution.executedPlan
+ assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isEmpty)
+ assert(plan.find(s => s.isInstanceOf[ArrowBatchScanExec]).isDefined)
+ }
+ }
+ }
+
test("test OneRowRelation") {
val df = sql("SELECT 1")
checkAnswer(df, Row(1))
diff --git
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 624428dcb..e2cfa335d 100644
---
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -19,7 +19,6 @@ package org.apache.gluten.columnarbatch;
import org.apache.gluten.exception.GlutenException;
import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.Runtimes;
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.memory.nmm.NativeMemoryManager;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
@@ -221,8 +220,7 @@ public class ColumnarBatches {
final Runtime runtime = Runtimes.contextInstance();
try (ArrowArray cArray = ArrowArray.allocateNew(allocator);
ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
- ArrowAbiUtil.exportFromSparkColumnarBatch(
- ArrowBufferAllocators.contextInstance(), input, cSchema, cArray);
+ ArrowAbiUtil.exportFromSparkColumnarBatch(allocator, input, cSchema,
cArray);
long handle =
ColumnarBatchJniWrapper.forRuntime(runtime)
.createWithArrowArray(cSchema.memoryAddress(),
cArray.memoryAddress());
diff --git a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
index 26bebcfae..99eb72c70 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
@@ -17,8 +17,6 @@
package org.apache.gluten.utils
import org.apache.gluten.exception.SchemaMismatchException
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
import org.apache.gluten.vectorized.ArrowWritableColumnVector
import org.apache.spark.internal.Logging
@@ -34,6 +32,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch,
ColumnVector}
import org.apache.arrow.c.{ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.dataset.file.{FileFormat, FileSystemDatasetFactory}
+import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
@@ -140,19 +139,22 @@ object ArrowUtil extends Logging {
rewritten.toString
}
- def makeArrowDiscovery(encodedUri: String, format: FileFormat):
FileSystemDatasetFactory = {
- val allocator = ArrowBufferAllocators.contextInstance()
- val factory = new FileSystemDatasetFactory(
- allocator,
- ArrowNativeMemoryPool.arrowPool("FileSystemDatasetFactory"),
- format,
- rewriteUri(encodedUri))
+ def makeArrowDiscovery(
+ encodedUri: String,
+ format: FileFormat,
+ allocator: BufferAllocator,
+ pool: NativeMemoryPool): FileSystemDatasetFactory = {
+ val factory = new FileSystemDatasetFactory(allocator, pool, format,
rewriteUri(encodedUri))
factory
}
- def readSchema(file: FileStatus, format: FileFormat): Option[StructType] = {
+ def readSchema(
+ file: FileStatus,
+ format: FileFormat,
+ allocator: BufferAllocator,
+ pool: NativeMemoryPool): Option[StructType] = {
val factory: FileSystemDatasetFactory =
- makeArrowDiscovery(file.getPath.toString, format)
+ makeArrowDiscovery(file.getPath.toString, format, allocator, pool)
val schema = factory.inspect()
try {
Option(SparkSchemaUtil.fromArrowSchema(schema))
@@ -161,12 +163,16 @@ object ArrowUtil extends Logging {
}
}
- def readSchema(files: Seq[FileStatus], format: FileFormat):
Option[StructType] = {
+ def readSchema(
+ files: Seq[FileStatus],
+ format: FileFormat,
+ allocator: BufferAllocator,
+ pool: NativeMemoryPool): Option[StructType] = {
if (files.isEmpty) {
throw new IllegalArgumentException("No input file specified")
}
- readSchema(files.head, format)
+ readSchema(files.head, format, allocator, pool)
}
def compareStringFunc(caseSensitive: Boolean): (String, String) => Boolean =
{
@@ -254,6 +260,7 @@ object ArrowUtil extends Logging {
}
def loadBatch(
+ allocator: BufferAllocator,
input: ArrowRecordBatch,
dataSchema: StructType,
requiredSchema: StructType,
@@ -267,7 +274,7 @@ object ArrowUtil extends Logging {
rowCount,
SparkSchemaUtil.toArrowSchema(dataSchema),
input,
- ArrowBufferAllocators.contextInstance())
+ allocator)
} finally {
input.close()
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index dbd7dc187..366796a57 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -428,8 +428,15 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-23786: warning should be printed if CSV header doesn't
conform to schema")
// file cars.csv include null string, Arrow not support to read
.exclude("DDL test with schema")
- // file cars.csv include null string, Arrow not support to read
.exclude("old csv data source name works")
+ .exclude("save csv")
+ .exclude("save csv with compression codec option")
+ .exclude("save csv with empty fields with user defined empty values")
+ .exclude("save csv with quote")
+ .exclude("SPARK-13543 Write the output as uncompressed via option()")
+ // Rule
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch
+ // Early Filter and Projection Push-Down generated an invalid plan
+ .exclude("SPARK-26208: write and read empty data to csv file with headers")
enableSuite[GlutenCSVLegacyTimeParserSuite]
.exclude("SPARK-23786: warning should be printed if CSV header doesn't
conform to schema")
// file cars.csv include null string, Arrow not support to read
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 9b469a98d..128e52a79 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -215,6 +215,15 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Gluten - test for FAILFAST parsing mode")
// file cars.csv include null string, Arrow not support to read
.exclude("old csv data source name works")
+ .exclude("DDL test with schema")
+ .exclude("save csv")
+ .exclude("save csv with compression codec option")
+ .exclude("save csv with empty fields with user defined empty values")
+ .exclude("save csv with quote")
+ .exclude("SPARK-13543 Write the output as uncompressed via option()")
+ // Rule
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch
+ // Early Filter and Projection Push-Down generated an invalid plan
+ .exclude("SPARK-26208: write and read empty data to csv file with headers")
enableSuite[GlutenCSVLegacyTimeParserSuite]
// file cars.csv include null string, Arrow not support to read
.exclude("DDL test with schema")
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 1afa203ab..6ea29847b 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -195,6 +195,15 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Gluten - test for FAILFAST parsing mode")
// file cars.csv include null string, Arrow not support to read
.exclude("old csv data source name works")
+ .exclude("DDL test with schema")
+ .exclude("save csv")
+ .exclude("save csv with compression codec option")
+ .exclude("save csv with empty fields with user defined empty values")
+ .exclude("save csv with quote")
+ .exclude("SPARK-13543 Write the output as uncompressed via option()")
+ // Rule
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch
+ // Early Filter and Projection Push-Down generated an invalid plan
+ .exclude("SPARK-26208: write and read empty data to csv file with headers")
enableSuite[GlutenCSVLegacyTimeParserSuite]
// file cars.csv include null string, Arrow not support to read
.exclude("DDL test with schema")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 61353d99f..e6e42acb3 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -196,8 +196,17 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-27873: disabling enforceSchema should not fail
columnNameOfCorruptRecord")
enableSuite[GlutenCSVv2Suite]
.exclude("Gluten - test for FAILFAST parsing mode")
+ // Rule
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown in batch
+ // Early Filter and Projection Push-Down generated an invalid plan
+ .exclude("SPARK-26208: write and read empty data to csv file with headers")
// file cars.csv include null string, Arrow not support to read
.exclude("old csv data source name works")
+ .exclude("DDL test with schema")
+ .exclude("save csv")
+ .exclude("save csv with compression codec option")
+ .exclude("save csv with empty fields with user defined empty values")
+ .exclude("save csv with quote")
+ .exclude("SPARK-13543 Write the output as uncompressed via option()")
enableSuite[GlutenCSVLegacyTimeParserSuite]
// file cars.csv include null string, Arrow not support to read
.exclude("DDL test with schema")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index 38e6c9873..cb7ce87f9 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -113,6 +113,7 @@ class GlutenCSVv2Suite extends GlutenCSVSuite {
override def sparkConf: SparkConf =
super.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
+ .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
override def testNameBlackList: Seq[String] = Seq(
// overwritten with different test
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 4db784782..e445dd33a 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -100,3 +100,7 @@ abstract class BatchScanExecShim(
)
}
}
+
+abstract class ArrowBatchScanExecShim(original: BatchScanExec) extends
DataSourceV2ScanExecBase {
+ @transient override lazy val partitions: Seq[InputPartition] =
original.partitions
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 76556052c..06eb69a35 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -137,3 +137,9 @@ abstract class BatchScanExecShim(
Boolean.box(replicatePartitions))
}
}
+
+abstract class ArrowBatchScanExecShim(original: BatchScanExec) extends
DataSourceV2ScanExecBase {
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
original.inputPartitions
+
+ override def keyGroupedPartitioning: Option[Seq[Expression]] =
original.keyGroupedPartitioning
+}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index ca9a7eb2d..64afc8193 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -150,3 +150,11 @@ abstract class BatchScanExecShim(
}
}
}
+
+abstract class ArrowBatchScanExecShim(original: BatchScanExec) extends
DataSourceV2ScanExecBase {
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
original.inputPartitions
+
+ override def keyGroupedPartitioning: Option[Seq[Expression]] =
original.keyGroupedPartitioning
+
+ override def ordering: Option[Seq[SortOrder]] = original.ordering
+}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 47adf16fb..8949a46a1 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -152,3 +152,11 @@ abstract class BatchScanExecShim(
}
}
}
+
+abstract class ArrowBatchScanExecShim(original: BatchScanExec) extends
DataSourceV2ScanExecBase {
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
original.inputPartitions
+
+ override def keyGroupedPartitioning: Option[Seq[Expression]] =
original.keyGroupedPartitioning
+
+ override def ordering: Option[Seq[SortOrder]] = original.ordering
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]