This is an automated email from the ASF dual-hosted git repository.
yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2ff2fd4dd [GLUTEN-6509] enable read iceberg table with timestamptz as
partitioned column. (#6508)
2ff2fd4dd is described below
commit 2ff2fd4dd7dbd2261bf49f39c5e69b8c2b0c201a
Author: j7nhai <[email protected]>
AuthorDate: Mon Jul 22 10:47:12 2024 +0800
[GLUTEN-6509] enable read iceberg table with timestamptz as partitioned
column. (#6508)
* fix timestamp tz.
* fix styles.
* fix styles.
* fix logs.
* add iceberg tests.
* pr ready.
* delete logging
* fix test sql
* ignore 3.4
* format
* scalastyle.
* fix test.
---
.../org/apache/spark/sql/GlutenQueryTest.scala | 9 +++++++
.../spark/source/GlutenIcebergSourceUtil.scala | 12 ++++-----
.../gluten/execution/VeloxIcebergSuite.scala | 31 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 6 deletions(-)
diff --git
a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
index ab30cb14e..53abaa9ac 100644
--- a/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
+++ b/gluten-core/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
@@ -95,6 +95,15 @@ abstract class GlutenQueryTest extends PlanTest {
}
}
+ def testWithSpecifiedSparkVersion(testName: String, versions: Array[String])(
+ testFun: => Any): Unit = {
+ if (versions.exists(v => shouldRun(Some(v), Some(v)))) {
+ test(testName) {
+ testFun
+ }
+ }
+ }
+
/** Runs the plan and makes sure the answer contains all of the keywords. */
def checkKeywordsExist(df: DataFrame, keywords: String*): Unit = {
val outputs = df.collect().map(_.mkString).mkString
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 6b67e7636..ad8222cff 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -25,7 +25,8 @@ import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.types.StructType
-import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat,
FileScanTask, ScanTask}
+import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat,
FileScanTask, ScanTask, Schema}
+import org.apache.iceberg.spark.SparkSchemaUtil
import java.lang.{Long => JLong}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList,
Map => JMap}
@@ -104,7 +105,6 @@ object GlutenIcebergSourceUtil {
task =>
val spec = task.spec()
if (spec.isPartitioned) {
- var partitionSchema = new StructType()
val readFields = scan.readSchema().fields.map(_.name).toSet
// Iceberg will generate some non-table fields as partition
fields, such as x_bucket,
// which will not appear in readFields, they also cannot be
filtered.
@@ -116,11 +116,11 @@ object GlutenIcebergSourceUtil {
.asScala
.filter(f => !tableFields.contains(f.name) ||
readFields.contains(f.name()))
partitionFields.foreach {
- field =>
- TypeUtil.validatePartitionColumnType(field.`type`().typeId())
- partitionSchema = partitionSchema.add(field.name(),
field.`type`().toString)
+ field =>
TypeUtil.validatePartitionColumnType(field.`type`().typeId())
}
- return partitionSchema
+
+ val icebergSchema = new Schema(partitionFields.toList.asJava)
+ return SparkSchemaUtil.convert(icebergSchema)
} else {
return new StructType()
}
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 77a1c790b..bb604f534 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -457,4 +457,35 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite
{
}
}
}
+
+ // Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone
is not supported
+ // in Spark 3.4
+ testWithSpecifiedSparkVersion("iceberg partition type - timestamp",
Array("3.2", "3.3", "3.5")) {
+ Seq("true", "false").foreach {
+ flag =>
+ withSQLConf(
+ "spark.sql.iceberg.handle-timestamp-without-timezone" -> flag,
+ "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables" ->
flag) {
+ withTable("part_by_timestamp") {
+ spark.sql("""
+ |create table part_by_timestamp (
+ | p timestamp
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '1'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql("""
+ |insert into table part_by_timestamp
+ |values (TIMESTAMP '2022-01-01 00:01:20');
+ |""".stripMargin)
+ val df = spark.sql("select * from part_by_timestamp")
+ checkAnswer(df, Row(java.sql.Timestamp.valueOf("2022-01-01
00:01:20")) :: Nil)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]