This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/comet-parquet-exec by this
push:
new 8563edfc fix: [comet-parquet-exec] fix regressions original comet
native scal implementation (#1170)
8563edfc is described below
commit 8563edfc66870983b8a008a18c3f0f0bfa276974
Author: Parth Chandra <[email protected]>
AuthorDate: Fri Dec 13 08:58:53 2024 -0800
fix: [comet-parquet-exec] fix regressions original comet native scal
implementation (#1170)
* fix: CometScanExec was created for unsupported cases if only
COMET_NATIVE_SCAN is enabled
* fix: Another try to fix ' test("Comet native metrics: BroadcastHashJoin")
* fix: some tests are valid only when full native scan is enabled
* Merge pull request #1 from andygrove/fix-tests-spark-cast-options
---
.../main/java/org/apache/comet/parquet/BatchReader.java | 2 +-
.../java/org/apache/comet/parquet/NativeBatchReader.java | 8 ++++----
native/spark-expr/src/cast.rs | 16 ++++++++--------
.../main/scala/org/apache/comet/DataTypeSupport.scala | 5 ++++-
.../scala/org/apache/spark/sql/comet/CometScanExec.scala | 11 +++++++++--
.../scala/org/apache/comet/CometExpressionSuite.scala | 2 ++
6 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index 11c6d14d..675dae9e 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -272,7 +272,7 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
requestedSchema =
CometParquetReadSupport.clipParquetSchema(
requestedSchema, sparkSchema, isCaseSensitive, useFieldId,
ignoreMissingIds);
- if (requestedSchema.getColumns().size() != sparkSchema.size()) {
+ if (requestedSchema.getFieldCount() != sparkSchema.size()) {
throw new IllegalArgumentException(
String.format(
"Spark schema has %d columns while " + "Parquet schema has %d
columns",
diff --git
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 17fab47e..3ac55ba4 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -245,7 +245,7 @@ public class NativeBatchReader extends RecordReader<Void,
ColumnarBatch> impleme
requestedSchema =
CometParquetReadSupport.clipParquetSchema(
requestedSchema, sparkSchema, isCaseSensitive, useFieldId,
ignoreMissingIds);
- if (requestedSchema.getColumns().size() != sparkSchema.size()) {
+ if (requestedSchema.getFieldCount() != sparkSchema.size()) {
throw new IllegalArgumentException(
String.format(
"Spark schema has %d columns while " + "Parquet schema has
%d columns",
@@ -267,9 +267,9 @@ public class NativeBatchReader extends RecordReader<Void,
ColumnarBatch> impleme
// ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
Type t = requestedSchema.getFields().get(i);
- Preconditions.checkState(
- t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
- "Complex type is not supported");
+ // Preconditions.checkState(
+ // t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
+ // "Complex type is not supported");
String[] colPath = paths.get(i);
if
(nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME()))
{
// Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always
populated with
diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs
index 17ab73b7..a6d13971 100644
--- a/native/spark-expr/src/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -571,7 +571,7 @@ impl SparkCastOptions {
eval_mode,
timezone: timezone.to_string(),
allow_incompat,
- is_adapting_schema: false,
+ is_adapting_schema: false
}
}
@@ -583,6 +583,7 @@ impl SparkCastOptions {
is_adapting_schema: false,
}
}
+
}
/// Spark-compatible cast implementation. Defers to DataFusion's cast where
that is known
@@ -2087,7 +2088,7 @@ mod tests {
let timezone = "UTC".to_string();
// test casting string dictionary array to timestamp array
- let cast_options = SparkCastOptions::new(EvalMode::Legacy,
timezone.clone(), false);
+ let cast_options = SparkCastOptions::new(EvalMode::Legacy, &timezone,
false);
let result = cast_array(
dict_array,
&DataType::Timestamp(TimeUnit::Microsecond,
Some(timezone.clone().into())),
@@ -2296,7 +2297,7 @@ mod tests {
fn test_cast_unsupported_timestamp_to_date() {
// Since datafusion uses chrono::Datetime internally not all dates
representable by TimestampMicrosecondType are supported
let timestamps: PrimitiveArray<TimestampMicrosecondType> =
vec![i64::MAX].into();
- let cast_options = SparkCastOptions::new(EvalMode::Legacy,
"UTC".to_string(), false);
+ let cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC",
false);
let result = cast_array(
Arc::new(timestamps.with_timezone("Europe/Copenhagen")),
&DataType::Date32,
@@ -2309,7 +2310,7 @@ mod tests {
fn test_cast_invalid_timezone() {
let timestamps: PrimitiveArray<TimestampMicrosecondType> =
vec![i64::MAX].into();
let cast_options =
- SparkCastOptions::new(EvalMode::Legacy, "Not a valid
timezone".to_string(), false);
+ SparkCastOptions::new(EvalMode::Legacy, "Not a valid timezone",
false);
let result = cast_array(
Arc::new(timestamps.with_timezone("Europe/Copenhagen")),
&DataType::Date32,
@@ -2335,7 +2336,7 @@ mod tests {
let string_array = cast_array(
c,
&DataType::Utf8,
- &SparkCastOptions::new(EvalMode::Legacy, "UTC".to_owned(), false),
+ &SparkCastOptions::new(EvalMode::Legacy, "UTC", false),
)
.unwrap();
let string_array = string_array.as_string::<i32>();
@@ -2400,10 +2401,9 @@ mod tests {
let cast_array = spark_cast(
ColumnarValue::Array(c),
&DataType::Struct(fields),
- EvalMode::Legacy,
+ &SparkCastOptions::new(EvalMode::Legacy,
"UTC",
- false,
- false,
+ false)
)
.unwrap();
if let ColumnarValue::Array(cast_array) = cast_array {
diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
index 09c062b8..eb524af9 100644
--- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
+++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
@@ -39,7 +39,10 @@ trait DataTypeSupport {
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
case t: DataType if t.typeName == "timestamp_ntz" => true
- case _: StructType => true
+ case _: StructType
+ if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED
+ .get() || CometConf.COMET_NATIVE_ARROW_SCAN_ENABLED.get() =>
+ true
case _ => false
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 49f7694b..5d28b4b7 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -131,8 +131,15 @@ case class CometScanExec(
// exposed for testing
lazy val bucketedScan: Boolean = wrapped.bucketedScan
- override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) =
- (wrapped.outputPartitioning, wrapped.outputOrdering)
+ override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) = {
+ if (bucketedScan) {
+ (wrapped.outputPartitioning, wrapped.outputOrdering)
+ } else {
+ val files = selectedPartitions.flatMap(partition => partition.files)
+ val numPartitions = files.length
+ (UnknownPartitioning(numPartitions), wrapped.outputOrdering)
+ }
+ }
@transient
private lazy val pushedDownFilters = getPushedDownFilters(relation,
dataFilters)
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 85ac6138..e65feb6b 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2217,6 +2217,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
val df = spark.read.parquet(dir.toString())
@@ -2249,6 +2250,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
val df = spark.read.parquet(dir.toString())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]