This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new aef1a623 [AURON #1638] Support scan ORC data using microsecond
precision (#1684)
aef1a623 is described below
commit aef1a623ba2cdc4549e3c06ce1e8bf2841413f23
Author: cxzl25 <[email protected]>
AuthorDate: Tue Dec 16 11:27:42 2025 +0800
[AURON #1638] Support scan ORC data using microsecond precision (#1684)
# Which issue does this PR close?
Closes #1638
# Rationale for this change
# What changes are included in this PR?
# Are there any user-facing changes?
# How was this patch tested?
---
native-engine/auron-jni-bridge/src/conf.rs | 1 +
native-engine/datafusion-ext-plans/src/orc_exec.rs | 8 ++++++++
.../apache/auron/spark/configuration/SparkAuronConfiguration.java | 5 +++++
.../src/main/java/org/apache/spark/sql/auron/AuronConf.java | 3 +++
4 files changed, 17 insertions(+)
diff --git a/native-engine/auron-jni-bridge/src/conf.rs
b/native-engine/auron-jni-bridge/src/conf.rs
index 1cb4f44e..e99037ad 100644
--- a/native-engine/auron-jni-bridge/src/conf.rs
+++ b/native-engine/auron-jni-bridge/src/conf.rs
@@ -56,6 +56,7 @@ define_conf!(IntConf, SMJ_FALLBACK_MEM_SIZE_THRESHOLD);
define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE);
define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE);
define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION);
+define_conf!(BooleanConf, ORC_TIMESTAMP_USE_MICROSECOND);
define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG);
define_conf!(BooleanConf, PARSE_JSON_ERROR_FALLBACK);
define_conf!(StringConf, NATIVE_LOG_LEVEL);
diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs
b/native-engine/datafusion-ext-plans/src/orc_exec.rs
index d5ace043..d07e4794 100644
--- a/native-engine/datafusion-ext-plans/src/orc_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs
@@ -43,6 +43,7 @@ use futures::{FutureExt, StreamExt, future::BoxFuture};
use futures_util::TryStreamExt;
use once_cell::sync::OnceCell;
use orc_rust::{
+ TimestampPrecision,
arrow_reader::ArrowReaderBuilder,
projection::ProjectionMask,
reader::{AsyncChunkReader, metadata::FileMetadata},
@@ -158,6 +159,7 @@ impl ExecutionPlan for OrcExec {
};
let force_positional_evolution =
conf::ORC_FORCE_POSITIONAL_EVOLUTION.value()?;
+ let use_microsecond_precision =
conf::ORC_TIMESTAMP_USE_MICROSECOND.value()?;
let opener: Arc<dyn FileOpener> = Arc::new(OrcOpener {
projection,
@@ -167,6 +169,7 @@ impl ExecutionPlan for OrcExec {
partition_index: partition,
metrics: self.metrics.clone(),
force_positional_evolution,
+ use_microsecond_precision,
});
let file_stream = Box::pin(FileStream::new(
@@ -213,6 +216,7 @@ struct OrcOpener {
partition_index: usize,
metrics: ExecutionPlanMetricsSet,
force_positional_evolution: bool,
+ use_microsecond_precision: bool,
}
impl FileOpener for OrcOpener {
@@ -240,11 +244,15 @@ impl FileOpener for OrcOpener {
projected_schema,
self.force_positional_evolution,
);
+ let use_microsecond = self.use_microsecond_precision;
Ok(Box::pin(async move {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
.await
.or_else(|err| df_execution_err!("create orc reader error:
{err}"))?;
+ if use_microsecond {
+ builder =
builder.with_timestamp_precision(TimestampPrecision::Microsecond);
+ }
if let Some(range) = file_meta.range.clone() {
let range = range.start as usize..range.end as usize;
builder = builder.with_file_byte_range(range);
diff --git
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index 5d6f2905..8d29a0bc 100644
---
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -232,6 +232,11 @@ public class SparkAuronConfiguration extends
AuronConfiguration {
.description("orc force positional evolution. ")
.booleanType()
.defaultValue(false);
+ public static final ConfigOption<Boolean> ORC_TIMESTAMP_USE_MICROSECOND =
ConfigOptions.key(
+ "auron.orc.timestamp.use.microsecond")
+ .description("use microsecond precision when reading ORC timestamp
columns. ")
+ .booleanType()
+ .defaultValue(false);
private final SparkConf sparkConf;
diff --git
a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
index aec3a6c6..b3cbca1a 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
@@ -136,6 +136,9 @@ public enum AuronConf {
ORC_FORCE_POSITIONAL_EVOLUTION("spark.auron.orc.force.positional.evolution",
false),
+ // use microsecond precision when reading ORC timestamp columns
+ ORC_TIMESTAMP_USE_MICROSECOND("spark.auron.orc.timestamp.use.microsecond",
false),
+
NATIVE_LOG_LEVEL("spark.auron.native.log.level", "info");
public final String key;