This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f538424d3 Experimental: Native CSV files read (#3044)
f538424d3 is described below
commit f538424d37f69019c7eed7032bd813d299f8d3cc
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Fri Jan 23 17:44:13 2026 +0400
Experimental: Native CSV files read (#3044)
---
.github/workflows/pr_build_linux.yml | 3 +
.github/workflows/pr_build_macos.yml | 3 +
.../main/scala/org/apache/comet/CometConf.scala | 10 ++
dev/benchmarks/comet-tpch.sh | 3 +-
dev/benchmarks/spark-tpch.sh | 3 +-
dev/benchmarks/tpcbench.py | 13 +-
docs/source/user-guide/latest/configs.md | 1 +
native/core/src/execution/operators/csv_scan.rs | 86 +++++++++++++
native/core/src/execution/operators/mod.rs | 2 +
native/core/src/execution/planner.rs | 40 ++++++
.../src/execution/planner/operator_registry.rs | 2 +
native/proto/src/proto/operator.proto | 20 +++
.../org/apache/comet/rules/CometExecRule.scala | 6 +-
.../org/apache/comet/rules/CometScanRule.scala | 42 ++++++
.../comet/serde/operator/CometNativeScan.scala | 52 +-------
.../org/apache/comet/serde/operator/package.scala | 70 ++++++++++
.../spark/sql/comet/CometCsvNativeScanExec.scala | 142 +++++++++++++++++++++
.../org/apache/spark/sql/comet/operators.scala | 9 +-
spark/src/test/resources/test-data/csv-test-2.csv | 4 +
.../apache/comet/csv/CometCsvNativeReadSuite.scala | 89 +++++++++++++
20 files changed, 539 insertions(+), 61 deletions(-)
diff --git a/.github/workflows/pr_build_linux.yml
b/.github/workflows/pr_build_linux.yml
index 53ba7abc9..4a0b27761 100644
--- a/.github/workflows/pr_build_linux.yml
+++ b/.github/workflows/pr_build_linux.yml
@@ -188,6 +188,9 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
+ - name: "csv"
+ value: |
+ org.apache.comet.csv.CometCsvNativeReadSuite
- name: "exec"
value: |
org.apache.comet.exec.CometAggregateSuite
diff --git a/.github/workflows/pr_build_macos.yml
b/.github/workflows/pr_build_macos.yml
index 88dc74cdb..37e623456 100644
--- a/.github/workflows/pr_build_macos.yml
+++ b/.github/workflows/pr_build_macos.yml
@@ -138,6 +138,9 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
+ - name: "csv"
+ value: |
+ org.apache.comet.csv.CometCsvNativeReadSuite
- name: "exec"
value: |
org.apache.comet.exec.CometAggregateSuite
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 89dbb6468..656dbc9a5 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -150,6 +150,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
+ conf("spark.comet.scan.csv.v2.enabled")
+ .category(CATEGORY_TESTING)
+ .doc(
+ "Whether to use the native Comet V2 CSV reader for improved
performance. " +
+ "Default: false (uses standard Spark CSV reader) " +
+ "Experimental: Performance benefits are workload-dependent.")
+ .booleanConf
+ .createWithDefault(false)
+
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
conf("spark.comet.parquet.respectFilterPushdown")
.category(CATEGORY_PARQUET)
diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh
index 8b63f87ed..f0709b7ef 100755
--- a/dev/benchmarks/comet-tpch.sh
+++ b/dev/benchmarks/comet-tpch.sh
@@ -50,4 +50,5 @@ $SPARK_HOME/bin/spark-submit \
--data $TPCH_DATA \
--queries $TPCH_QUERIES \
--output . \
- --iterations 1
+ --iterations 1 \
+ --format parquet
diff --git a/dev/benchmarks/spark-tpch.sh b/dev/benchmarks/spark-tpch.sh
index beebbd60f..ae359f049 100755
--- a/dev/benchmarks/spark-tpch.sh
+++ b/dev/benchmarks/spark-tpch.sh
@@ -42,4 +42,5 @@ $SPARK_HOME/bin/spark-submit \
--data $TPCH_DATA \
--queries $TPCH_QUERIES \
--output . \
- --iterations 1
+ --iterations 1 \
+ --format parquet
diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py
index 75944883d..130db7a62 100644
--- a/dev/benchmarks/tpcbench.py
+++ b/dev/benchmarks/tpcbench.py
@@ -20,6 +20,7 @@ from datetime import datetime
import json
from pyspark.sql import SparkSession
import time
+from typing import Dict
# rename same columns aliases
# a, a, b, b -> a, a_1, b, b_1
@@ -37,7 +38,7 @@ def dedup_columns(df):
new_cols.append(f"{c}_{counts[c]}")
return df.toDF(*new_cols)
-def main(benchmark: str, data_path: str, query_path: str, iterations: int,
output: str, name: str, query_num: int = None, write_path: str = None):
+def main(benchmark: str, data_path: str, query_path: str, iterations: int,
output: str, name: str, format: str, query_num: int = None, write_path: str =
None, options: Dict[str, str] = None):
# Initialize a SparkSession
spark = SparkSession.builder \
@@ -58,9 +59,9 @@ def main(benchmark: str, data_path: str, query_path: str,
iterations: int, outpu
raise "invalid benchmark"
for table in table_names:
- path = f"{data_path}/{table}.parquet"
+ path = f"{data_path}/{table}.{format}"
print(f"Registering table {table} using path {path}")
- df = spark.read.parquet(path)
+ df = spark.read.format(format).options(**options).load(path)
df.createOrReplaceTempView(table)
conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()}
@@ -146,7 +147,7 @@ def main(benchmark: str, data_path: str, query_path: str,
iterations: int, outpu
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DataFusion benchmark derived
from TPC-H / TPC-DS")
- parser.add_argument("--benchmark", required=True, help="Benchmark to run
(tpch or tpcds)")
+ parser.add_argument("--benchmark", required=True, default="tpch",
help="Benchmark to run (tpch or tpcds)")
parser.add_argument("--data", required=True, help="Path to data files")
parser.add_argument("--queries", required=True, help="Path to query files")
parser.add_argument("--iterations", required=False, default="1", help="How
many iterations to run")
@@ -154,7 +155,9 @@ if __name__ == "__main__":
parser.add_argument("--name", required=True, help="Prefix for result file
e.g. spark/comet/gluten")
parser.add_argument("--query", required=False, type=int, help="Specific
query number to run (1-based). If not specified, all queries will be run.")
parser.add_argument("--write", required=False, help="Path to save query
results to, in Parquet format.")
+ parser.add_argument("--format", required=True, default="parquet",
help="Input file format (parquet, csv, json)")
+ parser.add_argument("--options", type=json.loads, required=False,
default={}, help='Spark options as JSON string, e.g., \'{"header": "true",
"delimiter": ","}\'')
args = parser.parse_args()
- main(args.benchmark, args.data, args.queries, int(args.iterations),
args.output, args.name, args.query, args.write)
+ main(args.benchmark, args.data, args.queries, int(args.iterations),
args.output, args.name, args.format, args.query, args.write, args.options)
diff --git a/docs/source/user-guide/latest/configs.md
b/docs/source/user-guide/latest/configs.md
index 575c2ee80..7b32c2a4b 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -144,6 +144,7 @@ These settings can be used to determine which parts of the
plan are accelerated
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used
for Comet native execution when running Spark in on-heap mode. Available pool
types are `greedy`, `fair_spill`, `greedy_task_shared`,
`fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and
`unbounded`. | greedy_task_shared |
| `spark.comet.memoryOverhead` | The amount of additional memory to be
allocated per executor process for Comet, in MiB, when running Spark in on-heap
mode. | 1024 MiB |
| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write
through Comet. When enabled, Comet will intercept Parquet write operations and
execute them natively. This feature is highly experimental and only partially
implemented. It should not be used in production. | false |
+| `spark.comet.scan.csv.v2.enabled` | Whether to use the native Comet V2 CSV
reader for improved performance. Default: false (uses standard Spark CSV
reader) Experimental: Performance benefits are workload-dependent. | false |
| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow
columnar conversion. When this is turned on, Comet will convert operators in
`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format
before processing. This is an experimental feature and has known issues with
non-UTC timezones. | false |
| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list
of operators that will be converted to Arrow columnar format when
`spark.comet.sparkToColumnar.enabled` is true. |
Range,InMemoryTableScan,RDDScan |
| `spark.comet.testing.strict` | Experimental option to enable strict testing,
which will fail tests that could be more comprehensive, such as checking for a
specific fallback reason. It can be overridden by the environment variable
`ENABLE_COMET_STRICT_TESTING`. | false |
diff --git a/native/core/src/execution/operators/csv_scan.rs
b/native/core/src/execution/operators/csv_scan.rs
new file mode 100644
index 000000000..622386f0b
--- /dev/null
+++ b/native/core/src/execution/operators/csv_scan.rs
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::operators::ExecutionError;
+use arrow::datatypes::{Field, SchemaRef};
+use datafusion::common::DataFusionError;
+use datafusion::common::Result;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::CsvSource;
+use datafusion_comet_proto::spark_operator::CsvOptions;
+use datafusion_datasource::file_groups::FileGroup;
+use datafusion_datasource::file_scan_config::{FileScanConfig,
FileScanConfigBuilder};
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::PartitionedFile;
+use itertools::Itertools;
+use std::sync::Arc;
+
+pub fn init_csv_datasource_exec(
+ object_store_url: ObjectStoreUrl,
+ file_groups: Vec<Vec<PartitionedFile>>,
+ data_schema: SchemaRef,
+ partition_schema: SchemaRef,
+ projection_vector: Vec<usize>,
+ csv_options: &CsvOptions,
+) -> Result<Arc<DataSourceExec>, ExecutionError> {
+ let csv_source = build_csv_source(csv_options.clone());
+
+ let file_groups = file_groups
+ .iter()
+ .map(|files| FileGroup::new(files.clone()))
+ .collect();
+
+ let partition_fields = partition_schema
+ .fields()
+ .iter()
+ .map(|field| Field::new(field.name(), field.data_type().clone(),
field.is_nullable()))
+ .collect_vec();
+
+ let file_scan_config: FileScanConfig =
+ FileScanConfigBuilder::new(object_store_url, data_schema, csv_source)
+ .with_file_groups(file_groups)
+ .with_table_partition_cols(partition_fields)
+ .with_projection_indices(Some(projection_vector))
+ .build();
+
+ Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
+}
+
+fn build_csv_source(options: CsvOptions) -> Arc<CsvSource> {
+ let delimiter = string_to_u8(&options.delimiter, "delimiter").unwrap();
+ let quote = string_to_u8(&options.quote, "quote").unwrap();
+ let escape = string_to_u8(&options.escape, "escape").unwrap();
+ let terminator = string_to_u8(&options.terminator, "terminator").unwrap();
+ let comment = options
+ .comment
+ .map(|c| string_to_u8(&c, "comment").unwrap());
+ let csv_source = CsvSource::new(options.has_header, delimiter, quote)
+ .with_escape(Some(escape))
+ .with_comment(comment)
+ .with_terminator(Some(terminator))
+ .with_truncate_rows(options.truncated_rows);
+ Arc::new(csv_source)
+}
+
+fn string_to_u8(option: &str, option_name: &str) -> Result<u8> {
+ match option.as_bytes().first() {
+ Some(&ch) if ch.is_ascii() => Ok(ch),
+ _ => Err(DataFusionError::Configuration(format!(
+ "invalid {option_name} character '{option}': must be an ASCII
character"
+ ))),
+ }
+}
diff --git a/native/core/src/execution/operators/mod.rs
b/native/core/src/execution/operators/mod.rs
index 33b9be943..07ee99536 100644
--- a/native/core/src/execution/operators/mod.rs
+++ b/native/core/src/execution/operators/mod.rs
@@ -31,8 +31,10 @@ pub use expand::ExpandExec;
mod iceberg_scan;
mod parquet_writer;
pub use parquet_writer::ParquetWriterExec;
+mod csv_scan;
pub mod projection;
mod scan;
+pub use csv_scan::init_csv_datasource_exec;
/// Error returned during executing operators.
#[derive(thiserror::Error, Debug)]
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 2f1f1f32b..500203e76 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -21,6 +21,7 @@ pub mod expression_registry;
pub mod macros;
pub mod operator_registry;
+use crate::execution::operators::init_csv_datasource_exec;
use crate::execution::operators::IcebergScanExec;
use crate::{
errors::ExpressionError,
@@ -94,6 +95,7 @@ use datafusion::physical_expr::window::WindowExpr;
use datafusion::physical_expr::LexOrdering;
use crate::parquet::parquet_exec::init_datasource_exec;
+
use arrow::array::{
new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray,
Date32Array, Decimal128Array,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
ListArray,
@@ -1059,6 +1061,44 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
))
}
+ OpStruct::CsvScan(scan) => {
+ let data_schema =
convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
+ let partition_schema =
+
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
+ let projection_vector: Vec<usize> =
+ scan.projection_vector.iter().map(|i| *i as
usize).collect();
+ let object_store_options: HashMap<String, String> = scan
+ .object_store_options
+ .iter()
+ .map(|(k, v)| (k.clone(), v.clone()))
+ .collect();
+ let one_file = scan
+ .file_partitions
+ .first()
+ .and_then(|f| f.partitioned_file.first())
+ .map(|f| f.file_path.clone())
+ .ok_or(GeneralError("Failed to locate file".to_string()))?;
+ let (object_store_url, _) = prepare_object_store_with_configs(
+ self.session_ctx.runtime_env(),
+ one_file,
+ &object_store_options,
+ )?;
+ let files =
+
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
+ let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
+ let scan = init_csv_datasource_exec(
+ object_store_url,
+ file_groups,
+ data_schema,
+ partition_schema,
+ projection_vector,
+ &scan.csv_options.clone().unwrap(),
+ )?;
+ Ok((
+ vec![],
+ Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
+ ))
+ }
OpStruct::Scan(scan) => {
let data_types =
scan.fields.iter().map(to_arrow_datatype).collect_vec();
diff --git a/native/core/src/execution/planner/operator_registry.rs
b/native/core/src/execution/planner/operator_registry.rs
index e4899280b..b34a80df9 100644
--- a/native/core/src/execution/planner/operator_registry.rs
+++ b/native/core/src/execution/planner/operator_registry.rs
@@ -60,6 +60,7 @@ pub enum OperatorType {
SortMergeJoin,
HashJoin,
Window,
+ CsvScan,
}
/// Global registry of operator builders
@@ -151,5 +152,6 @@ fn get_operator_type(spark_operator: &Operator) ->
Option<OperatorType> {
OpStruct::HashJoin(_) => Some(OperatorType::HashJoin),
OpStruct::Window(_) => Some(OperatorType::Window),
OpStruct::Explode(_) => None, // Not yet in OperatorType enum
+ OpStruct::CsvScan(_) => Some(OperatorType::CsvScan),
}
}
diff --git a/native/proto/src/proto/operator.proto
b/native/proto/src/proto/operator.proto
index a1a3c4bed..c00b95396 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -51,6 +51,7 @@ message Operator {
IcebergScan iceberg_scan = 112;
ParquetWriter parquet_writer = 113;
Explode explode = 114;
+ CsvScan csv_scan = 115;
}
}
@@ -110,6 +111,25 @@ message NativeScan {
bool encryption_enabled = 14;
}
+message CsvScan {
+ repeated SparkStructField data_schema = 1;
+ repeated SparkStructField partition_schema = 2;
+ repeated int32 projection_vector = 3;
+ repeated SparkFilePartition file_partitions = 4;
+ map<string, string> object_store_options = 5;
+ CsvOptions csv_options = 6;
+}
+
+message CsvOptions {
+ bool has_header = 1;
+ string delimiter = 2;
+ string quote = 3;
+ string escape = 4;
+ optional string comment = 5;
+ string terminator = 7;
+ bool truncated_rows = 8;
+}
+
message IcebergScan {
// Schema to read
repeated SparkStructField required_schema = 1;
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index bb4ce879d..9c23b1be6 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -51,9 +51,8 @@ import org.apache.comet.{CometConf, CometExplainInfo,
ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED,
COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.rules.CometExecRule.allExecs
-import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible,
OperatorOuterClass, Unsupported}
+import org.apache.comet.serde._
import org.apache.comet.serde.operator._
-import org.apache.comet.serde.operator.CometDataWritingCommand
object CometExecRule {
@@ -191,6 +190,9 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
case scan: CometBatchScanExec if
scan.nativeIcebergScanMetadata.isDefined =>
convertToComet(scan, CometIcebergNativeScan).getOrElse(scan)
+ case scan: CometBatchScanExec if scan.wrapped.scan.isInstanceOf[CSVScan]
=>
+ convertToComet(scan, CometCsvNativeScanExec).getOrElse(scan)
+
// Comet JVM + native scan for V1 and V2
case op if isCometScan(op) =>
convertToComet(op, CometScanWrapper).getOrElse(op)
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 8ba2c5845..4310605f2 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec,
CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -255,6 +256,47 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
withInfos(scanExec, fallbackReasons.toSet)
}
+ case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() =>
+ val fallbackReasons = new ListBuffer[String]()
+ val schemaSupported =
+ CometBatchScanExec.isSchemaSupported(scan.readDataSchema,
fallbackReasons)
+ if (!schemaSupported) {
+ fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
+ }
+ val partitionSchemaSupported =
+ CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema,
fallbackReasons)
+ if (!partitionSchemaSupported) {
+ fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is
not supported"
+ }
+ val corruptedRecordsColumnName =
+ SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
+ val containsCorruptedRecordsColumn =
+ !scan.readDataSchema.fieldNames.contains(corruptedRecordsColumnName)
+ if (!containsCorruptedRecordsColumn) {
+ fallbackReasons += "Comet doesn't support the processing of
corrupted records"
+ }
+ val isInferSchemaEnabled = scan.options.getBoolean("inferSchema",
false)
+ if (isInferSchemaEnabled) {
+ fallbackReasons += "Comet doesn't support inferSchema=true option"
+ }
+ val delimiter =
+ Option(scan.options.get("delimiter"))
+ .orElse(Option(scan.options.get("sep")))
+ .getOrElse(",")
+ val isSingleCharacterDelimiter = delimiter.length == 1
+ if (!isSingleCharacterDelimiter) {
+ fallbackReasons +=
+ s"Comet supports only single-character delimiters, but got:
'$delimiter'"
+ }
+ if (schemaSupported && partitionSchemaSupported &&
containsCorruptedRecordsColumn
+ && !isInferSchemaEnabled && isSingleCharacterDelimiter) {
+ CometBatchScanExec(
+ scanExec.clone().asInstanceOf[BatchScanExec],
+ runtimeFilters = scanExec.runtimeFilters)
+ } else {
+ withInfos(scanExec, fallbackReasons.toSet)
+ }
+
// Iceberg scan - patched version implementing SupportsComet interface
case s: SupportsComet if !COMET_ICEBERG_NATIVE_ENABLED.get() =>
val fallbackReasons = new ListBuffer[String]()
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
index 12be14450..b7909b67c 100644
--- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
@@ -27,9 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression,
Literal, PlanExpre
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec,
CometScanExec}
import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometConf.COMET_EXEC_ENABLED
@@ -144,12 +143,13 @@ object CometNativeScan extends
CometOperatorSerde[CometScanExec] with Logging {
var firstPartition: Option[PartitionedFile] = None
val filePartitions = scan.getFilePartitions()
- filePartitions.foreach { partition =>
+ val filePartitionsProto = filePartitions.map { partition =>
if (firstPartition.isEmpty) {
firstPartition = partition.files.headOption
}
- partition2Proto(partition, nativeScanBuilder,
scan.relation.partitionSchema)
+ partition2Proto(partition, scan.relation.partitionSchema)
}
+ nativeScanBuilder.addAllFilePartitions(filePartitionsProto.asJava)
val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields)
val requiredSchema = schema2Proto(scan.requiredSchema.fields)
@@ -203,50 +203,6 @@ object CometNativeScan extends
CometOperatorSerde[CometScanExec] with Logging {
}
- private def schema2Proto(
- fields: Array[StructField]): Array[OperatorOuterClass.SparkStructField]
= {
- val fieldBuilder = OperatorOuterClass.SparkStructField.newBuilder()
- fields.map(field => {
- fieldBuilder.setName(field.name)
- fieldBuilder.setDataType(serializeDataType(field.dataType).get)
- fieldBuilder.setNullable(field.nullable)
- fieldBuilder.build()
- })
- }
-
- private def partition2Proto(
- partition: FilePartition,
- nativeScanBuilder: OperatorOuterClass.NativeScan.Builder,
- partitionSchema: StructType): Unit = {
- val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder()
- partition.files.foreach(file => {
- // Process the partition values
- val partitionValues = file.partitionValues
- assert(partitionValues.numFields == partitionSchema.length)
- val partitionVals =
- partitionValues.toSeq(partitionSchema).zipWithIndex.map { case (value,
i) =>
- val attr = partitionSchema(i)
- val valueProto = exprToProto(Literal(value, attr.dataType),
Seq.empty)
- // In `CometScanRule`, we have already checked that all partition
values are
- // supported. So, we can safely use `get` here.
- assert(
- valueProto.isDefined,
- s"Unsupported partition value: $value, type: ${attr.dataType}")
- valueProto.get
- }
-
- val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder()
- partitionVals.foreach(fileBuilder.addPartitionValues)
- fileBuilder
- .setFilePath(file.filePath.toString)
- .setStart(file.start)
- .setLength(file.length)
- .setFileSize(file.fileSize)
- partitionBuilder.addPartitionedFile(fileBuilder.build())
- })
- nativeScanBuilder.addFilePartitions(partitionBuilder.build())
- }
-
override def createExec(nativeOp: Operator, op: CometScanExec):
CometNativeExec = {
CometNativeScanExec(nativeOp, op.wrapped, op.session)
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala
new file mode 100644
index 000000000..7b811d09e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.serde
+
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.execution.datasources.FilePartition
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
+
+package object operator {
+
+ def schema2Proto(fields: Array[StructField]):
Array[OperatorOuterClass.SparkStructField] = {
+ val fieldBuilder = OperatorOuterClass.SparkStructField.newBuilder()
+ fields.map { field =>
+ fieldBuilder.setName(field.name)
+ fieldBuilder.setDataType(serializeDataType(field.dataType).get)
+ fieldBuilder.setNullable(field.nullable)
+ fieldBuilder.build()
+ }
+ }
+
+ def partition2Proto(
+ partition: FilePartition,
+ partitionSchema: StructType): OperatorOuterClass.SparkFilePartition = {
+ val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder()
+ partition.files.foreach(file => {
+ // Process the partition values
+ val partitionValues = file.partitionValues
+ assert(partitionValues.numFields == partitionSchema.length)
+ val partitionVals =
+ partitionValues.toSeq(partitionSchema).zipWithIndex.map { case (value,
i) =>
+ val attr = partitionSchema(i)
+ val valueProto = exprToProto(Literal(value, attr.dataType),
Seq.empty)
+ // In `CometScanRule`, we have already checked that all partition
values are
+ // supported. So, we can safely use `get` here.
+ assert(
+ valueProto.isDefined,
+ s"Unsupported partition value: $value, type: ${attr.dataType}")
+ valueProto.get
+ }
+ val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder()
+ partitionVals.foreach(fileBuilder.addPartitionValues)
+ fileBuilder
+ .setFilePath(file.filePath.toString)
+ .setStart(file.start)
+ .setLength(file.length)
+ .setFileSize(file.fileSize)
+ partitionBuilder.addPartitionedFile(fileBuilder.build())
+ })
+ partitionBuilder.build()
+ }
+}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometCsvNativeScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometCsvNativeScanExec.scala
new file mode 100644
index 000000000..39ebee49a
--- /dev/null
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometCsvNativeScanExec.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
UnknownPartitioning}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.FilePartition
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
+
+import com.google.common.base.Objects
+
+import org.apache.comet.{CometConf, ConfigEntry}
+import org.apache.comet.objectstore.NativeConfig
+import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
+import org.apache.comet.serde.OperatorOuterClass.Operator
+import org.apache.comet.serde.operator.{partition2Proto, schema2Proto}
+
+/*
+ * Native CSV scan operator that delegates file reading to datafusion.
+ */
+case class CometCsvNativeScanExec(
+ override val nativeOp: Operator,
+ override val output: Seq[Attribute],
+ @transient override val originalPlan: BatchScanExec,
+ override val serializedPlanOpt: SerializedPlan)
+ extends CometLeafExec {
+ override val supportsColumnar: Boolean = true
+
+ override val nodeName: String = "CometCsvNativeScan"
+
+ override def outputPartitioning: Partitioning = UnknownPartitioning(
+ originalPlan.inputPartitions.length)
+
+ override def outputOrdering: Seq[SortOrder] = Nil
+
+ override protected def doCanonicalize(): SparkPlan = {
+ CometCsvNativeScanExec(nativeOp, output, originalPlan, serializedPlanOpt)
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: CometCsvNativeScanExec =>
+ this.output == other.output &&
+ this.serializedPlanOpt == other.serializedPlanOpt &&
+ this.originalPlan == other.originalPlan
+ case _ =>
+ false
+ }
+ }
+
+ override def hashCode(): Int = {
+ Objects.hashCode(output, serializedPlanOpt, originalPlan)
+ }
+}
+
+object CometCsvNativeScanExec extends CometOperatorSerde[CometBatchScanExec] {
+
+ override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
+ CometConf.COMET_CSV_V2_NATIVE_ENABLED)
+
+ override def convert(
+ op: CometBatchScanExec,
+ builder: Operator.Builder,
+ childOp: Operator*): Option[Operator] = {
+ val csvScanBuilder = OperatorOuterClass.CsvScan.newBuilder()
+ val csvScan = op.wrapped.scan.asInstanceOf[CSVScan]
+ val sessionState = op.session.sessionState
+ val options = {
+ val columnPruning = sessionState.conf.csvColumnPruning
+ val timeZone = sessionState.conf.sessionLocalTimeZone
+ new CSVOptions(csvScan.options.asScala.toMap, columnPruning, timeZone)
+ }
+ val filePartitions = op.inputPartitions.map(_.asInstanceOf[FilePartition])
+ val csvOptionsProto = csvOptions2Proto(options)
+ val dataSchemaProto = schema2Proto(csvScan.dataSchema.fields)
+ val readSchemaFieldNames = csvScan.readDataSchema.fieldNames
+ val projectionVector = csvScan.dataSchema.fields.zipWithIndex
+ .filter { case (field, _) =>
+ readSchemaFieldNames.contains(field.name)
+ }
+ .map(_._2.asInstanceOf[Integer])
+ val partitionSchemaProto = schema2Proto(csvScan.readPartitionSchema.fields)
+ val partitionsProto = filePartitions.map(partition2Proto(_,
csvScan.readPartitionSchema))
+
+ val objectStoreOptions = filePartitions.headOption
+ .flatMap { partitionFile =>
+ val hadoopConf = sessionState
+ .newHadoopConfWithOptions(op.session.sparkContext.conf.getAll.toMap)
+ partitionFile.files.headOption
+ .map(file => NativeConfig.extractObjectStoreOptions(hadoopConf,
file.pathUri))
+ }
+ .getOrElse(Map.empty)
+
+ csvScanBuilder.putAllObjectStoreOptions(objectStoreOptions.asJava)
+ csvScanBuilder.setCsvOptions(csvOptionsProto)
+ csvScanBuilder.addAllFilePartitions(partitionsProto.asJava)
+ csvScanBuilder.addAllDataSchema(dataSchemaProto.toIterable.asJava)
+ csvScanBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)
+
csvScanBuilder.addAllPartitionSchema(partitionSchemaProto.toIterable.asJava)
+ Some(builder.setCsvScan(csvScanBuilder).build())
+ }
+
+ override def createExec(nativeOp: Operator, op: CometBatchScanExec):
CometNativeExec = {
+ CometCsvNativeScanExec(nativeOp, op.output, op.wrapped,
SerializedPlan(None))
+ }
+
+ private def csvOptions2Proto(options: CSVOptions):
OperatorOuterClass.CsvOptions = {
+ val csvOptionsBuilder = OperatorOuterClass.CsvOptions.newBuilder()
+ csvOptionsBuilder.setDelimiter(options.delimiter)
+ csvOptionsBuilder.setHasHeader(options.headerFlag)
+ csvOptionsBuilder.setQuote(options.quote.toString)
+ csvOptionsBuilder.setEscape(options.escape.toString)
+ csvOptionsBuilder.setTerminator(options.lineSeparator.getOrElse("\n"))
+ csvOptionsBuilder.setTruncatedRows(options.multiLine)
+ if (options.isCommentSet) {
+ csvOptionsBuilder.setComment(options.comment.toString)
+ }
+ csvOptionsBuilder.build()
+ }
+}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index cb7098617..6f33467ef 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -426,10 +426,11 @@ abstract class CometNativeExec extends CometExec {
def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit =
{
plan match {
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
- _: CometIcebergNativeScanExec | _: ShuffleQueryStageExec | _:
AQEShuffleReadExec |
- _: CometShuffleExchangeExec | _: CometUnionExec | _:
CometTakeOrderedAndProjectExec |
- _: CometCoalesceExec | _: ReusedExchangeExec | _:
CometBroadcastExchangeExec |
- _: BroadcastQueryStageExec | _: CometSparkToColumnarExec | _:
CometLocalTableScanExec =>
+ _: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _:
ShuffleQueryStageExec |
+ _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _:
CometUnionExec |
+ _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _:
ReusedExchangeExec |
+ _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec |
+ _: CometSparkToColumnarExec | _: CometLocalTableScanExec =>
func(plan)
case _: CometPlan =>
// Other Comet operators, continue to traverse the tree.
diff --git a/spark/src/test/resources/test-data/csv-test-2.csv
b/spark/src/test/resources/test-data/csv-test-2.csv
new file mode 100644
index 000000000..1c7c834f1
--- /dev/null
+++ b/spark/src/test/resources/test-data/csv-test-2.csv
@@ -0,0 +1,4 @@
+a,b,c
+1,2,3
+4,5,6
+7,0,8
\ No newline at end of file
diff --git
a/spark/src/test/scala/org/apache/comet/csv/CometCsvNativeReadSuite.scala
b/spark/src/test/scala/org/apache/comet/csv/CometCsvNativeReadSuite.scala
new file mode 100644
index 000000000..e9a18a18d
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/csv/CometCsvNativeReadSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.csv
+
+import org.apache.spark.sql.CometTestBase
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+import org.apache.comet.CometConf
+
+class CometCsvNativeReadSuite extends CometTestBase {
+ private val TEST_CSV_PATH_NO_HEADER =
"src/test/resources/test-data/csv-test-1.csv"
+ private val TEST_CSV_PATH_HAS_HEADER =
"src/test/resources/test-data/csv-test-2.csv"
+
+ test("Native csv read - with schema") {
+ withSQLConf(
+ CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true",
+ SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ val schema = new StructType()
+ .add("a", IntegerType)
+ .add("b", IntegerType)
+ .add("c", IntegerType)
+ val df = spark.read
+ .options(Map("header" -> "false", "delimiter" -> ","))
+ .schema(schema)
+ .csv(TEST_CSV_PATH_NO_HEADER)
+ checkSparkAnswerAndOperator(df)
+ }
+ }
+
+ test("Native csv read - without schema") {
+ withSQLConf(
+ CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true",
+ SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ val df = spark.read
+ .options(Map("header" -> "true", "delimiter" -> ","))
+ .csv(TEST_CSV_PATH_HAS_HEADER)
+ checkSparkAnswerAndOperator(df)
+ }
+ }
+
+ test("Native csv read - test fallback reasons") {
+ withSQLConf(
+ CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true",
+ SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ val columnNameOfCorruptedRecords =
+ SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
+ val schema = new StructType()
+ .add("a", IntegerType)
+ .add("b", IntegerType)
+ .add("c", IntegerType)
+ .add(columnNameOfCorruptedRecords, StringType)
+ var df = spark.read
+ .options(Map("header" -> "false", "delimiter" -> ","))
+ .schema(schema)
+ .csv(TEST_CSV_PATH_NO_HEADER)
+ checkSparkAnswerAndFallbackReason(
+ df,
+ "Comet doesn't support the processing of corrupted records")
+ df = spark.read
+ .options(Map("header" -> "false", "delimiter" -> ",", "inferSchema" ->
"true"))
+ .csv(TEST_CSV_PATH_NO_HEADER)
+ checkSparkAnswerAndFallbackReason(df, "Comet doesn't support
inferSchema=true option")
+ df = spark.read
+ .options(Map("header" -> "false", "delimiter" -> ",,"))
+ .csv(TEST_CSV_PATH_NO_HEADER)
+ checkSparkAnswerAndFallbackReason(
+ df,
+ "Comet supports only single-character delimiters, but got: ',,'")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]