This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 059516dd [AURON #1656] Support to disable the scan timestamp for
Parquet and ORC formats (#1657)
059516dd is described below
commit 059516ddb9d714f61594c80440b6961e91f1f546
Author: cxzl25 <[email protected]>
AuthorDate: Thu Nov 27 17:48:49 2025 +0800
[AURON #1656] Support to disable the scan timestamp for Parquet and ORC
formats (#1657)
* Support to disable the scan timestamp for Parquet and ORC formats
* nested type
* Update
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
Co-authored-by: Copilot <[email protected]>
* Update
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
Co-authored-by: Copilot <[email protected]>
* use requiredSchema
* Update
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
Co-authored-by: Copilot <[email protected]>
* Update
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
Co-authored-by: Copilot <[email protected]>
* table
---------
Co-authored-by: Copilot <[email protected]>
---
.../apache/spark/sql/auron/AuronConverters.scala | 22 +++++++++++++++++++++-
.../apache/spark/sql/auron/NativeConverters.scala | 13 +++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 5717e4d8..72589836 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -28,7 +28,7 @@ import org.apache.spark.Partition
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.{config, Logging}
import
org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag,
convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert,
joinSmallerSideTag, neverConvertReasonTag}
-import org.apache.spark.sql.auron.NativeConverters.{roundRobinTypeSupported,
scalarTypeSupported, StubExpr}
+import org.apache.spark.sql.auron.NativeConverters.{existTimestampType,
roundRobinTypeSupported, scalarTypeSupported, StubExpr}
import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion
import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction
import org.apache.spark.sql.catalyst.expressions.Alias
@@ -135,8 +135,12 @@ object AuronConverters extends Logging {
getBooleanConf("spark.auron.enable.data.writing", defaultValue = false)
def enableScanParquet: Boolean =
getBooleanConf("spark.auron.enable.scan.parquet", defaultValue = true)
+ def enableScanParquetTimestamp: Boolean =
+ getBooleanConf("spark.auron.enable.scan.parquet.timestamp", defaultValue =
true)
def enableScanOrc: Boolean =
getBooleanConf("spark.auron.enable.scan.orc", defaultValue = true)
+ def enableScanOrcTimestamp: Boolean =
+ getBooleanConf("spark.auron.enable.scan.orc.timestamp", defaultValue =
true)
def enableBroadcastExchange: Boolean =
getBooleanConf("spark.auron.enable.broadcastExchange", defaultValue = true)
def enableShuffleExechange: Boolean =
@@ -467,9 +471,25 @@ object AuronConverters extends Logging {
relation.fileFormat match {
case p if p.getClass.getName.endsWith("ParquetFileFormat") =>
assert(enableScanParquet)
+ if (!enableScanParquetTimestamp) {
+ assert(
+ !exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
+ s"Parquet scan with timestamp type is not supported for table:
${tableIdentifier
+ .getOrElse("unknown")}. " +
+ "Set spark.auron.enable.scan.parquet.timestamp=true to enable
timestamp support " +
+ "or remove timestamp columns from the query.")
+ }
addRenameColumnsExec(Shims.get.createNativeParquetScanExec(exec))
case p if p.getClass.getName.endsWith("OrcFileFormat") =>
assert(enableScanOrc)
+ if (!enableScanOrcTimestamp) {
+ assert(
+ !exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
+ s"ORC scan with timestamp type is not supported for
tableIdentifier: ${tableIdentifier
+ .getOrElse("unknown")}. " +
+ "Set spark.auron.enable.scan.orc.timestamp=true to enable
timestamp support " +
+ "or remove timestamp columns from the query.")
+ }
addRenameColumnsExec(Shims.get.createNativeOrcScanExec(exec))
case p =>
throw new NotImplementedError(
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 2447a9ef..d322105b 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -118,6 +118,19 @@ object NativeConverters extends Logging {
}
}
+ def existTimestampType(dataType: DataType): Boolean = {
+ dataType match {
+ case TimestampType =>
+ true
+ case at: ArrayType => existTimestampType(at.elementType)
+ case m: MapType =>
+ existTimestampType(m.keyType) || existTimestampType(m.valueType)
+ case s: StructType =>
+ s.fields.exists(e => existTimestampType(e.dataType))
+ case _ => false
+ }
+ }
+
def roundRobinTypeSupported(dataType: DataType): Boolean = dataType match {
case MapType(_, _, _) => false
case ArrayType(elementType, _) => roundRobinTypeSupported(elementType)