This is an automated email from the ASF dual-hosted git repository.

zhouyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9de3b50b1f [VL] Fixed iceberg config logic (#12221)
9de3b50b1f is described below

commit 9de3b50b1f26c966798c469923df5b77e3f27870
Author: inf <[email protected]>
AuthorDate: Fri Jun 5 11:46:04 2026 +0300

    [VL] Fixed iceberg config logic (#12221)
    
    The previous logic overrode SQLConf property with TableProperty, while not 
setting any value when SQLConf not set. Now it will prioritize SQLConf, then 
TableProperty
---
 .../execution/AbstractIcebergWriteExec.scala       |  2 +-
 .../execution/enhanced/VeloxIcebergSuite.scala     | 71 ++++++++++++++++++++++
 .../apache/gluten/execution/IcebergWriteExec.scala |  9 ++-
 3 files changed, 76 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
index d76c4256d4..2b4d50a73f 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
@@ -51,7 +51,7 @@ abstract class AbstractIcebergWriteExec extends 
IcebergWriteExec {
       MAX_TARGET_FILE_SIZE_SESSION.key -> getTargetFileSizeBytes
     ).foreach {
       case (key, value) =>
-        if (SQLConf.get.getConfString(key, null) != null) {
+        if (SQLConf.get.getConfString(key, null) == null) {
           icebergProperties.put(key, value)
         }
     }
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index e2df119bc0..ddd57dca46 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -465,4 +465,75 @@ class VeloxIcebergSuite extends IcebergSuite {
       )
     }
   }
+  ignore("disabled test") {
+    test("iceberg native write respects target file size bytes") {
+      withTable("iceberg_small_target_tbl") {
+        spark.sql(
+          """
+            |CREATE TABLE iceberg_small_target_tbl (
+            |  id INT,
+            |  payload STRING
+            |) USING iceberg
+            |TBLPROPERTIES (
+            |  'write.format.default' = 'parquet',
+            |  'write.parquet.compression-codec' = 'uncompressed',
+            |  'write.parquet.row-group-size-bytes' = '4096',
+            |  'write.parquet.page-size-bytes' = '1024B',
+            |  'write.target-file-size-bytes' = '8192'
+            |)
+            |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(
+            """
+              |SHOW TBLPROPERTIES iceberg_small_target_tbl
+              |('write.target-file-size-bytes')
+              |""".stripMargin),
+          Seq(Row("write.target-file-size-bytes", "8192"))
+        )
+
+        val df = spark.sql(
+          """
+            |INSERT INTO iceberg_small_target_tbl
+            |SELECT /*+ COALESCE(1) */
+            |  CAST(id AS INT),
+            |  concat(
+            |    CAST(id AS STRING),
+            |    '-',
+            |    sha2(CAST(id AS STRING), 256),
+            |    '-',
+            |    sha2(CAST(id + 1000 AS STRING), 256)
+            |  )
+            |FROM range(1000)
+            |""".stripMargin)
+
+        val commandPlan =
+          
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan
+
+        assert(commandPlan.isInstanceOf[VeloxIcebergAppendDataExec])
+
+        checkAnswer(
+          spark.sql("SELECT COUNT(*) FROM iceberg_small_target_tbl"),
+          Seq(Row(1000L)))
+
+        val files = spark.sql(
+          """
+            |SELECT file_size_in_bytes
+            |FROM default.iceberg_small_target_tbl.files
+            |""".stripMargin).collect().map(_.getLong(0))
+
+        assert(files.nonEmpty)
+
+        assert(
+          files.length > 1,
+          s"Expected write.target-file-size-bytes=8192 to create multiple 
files, " +
+            s"but got files=${files.mkString("[", ", ", "]")}")
+
+        assert(
+          files.max < 64L * 1024L,
+          s"Expected small target file size to keep max file size reasonably 
small, " +
+            s"but got files=${files.mkString("[", ", ", "]")}")
+      }
+    }
+  }
 }
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
index c93c918a1f..0544e47c1d 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.execution
 import org.apache.gluten.backendsapi.BackendsApiManager
 
 import org.apache.iceberg.{FileFormat, PartitionField, PartitionSpec, Schema, 
TableProperties}
-import org.apache.iceberg.TableProperties.{ORC_COMPRESSION, 
ORC_COMPRESSION_DEFAULT, PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT, 
PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT, 
WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT}
+import org.apache.iceberg.TableProperties.{ORC_COMPRESSION, 
ORC_COMPRESSION_DEFAULT, PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT, 
PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT}
 import org.apache.iceberg.avro.AvroSchemaUtil
 import org.apache.iceberg.spark.source.IcebergWriteUtil
 import org.apache.iceberg.types.Type.TypeID
@@ -50,15 +50,14 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
   }
 
   protected def getParquetPageSizeBytes: String = {
-    val config = IcebergWriteUtil.getWriteProperty(write)
-    config.getOrDefault(
+    val tableProps = IcebergWriteUtil.getTable(write).properties()
+    tableProps.getOrDefault(
       normalizeCapacityString(PARQUET_PAGE_SIZE_BYTES),
       normalizeCapacityString(PARQUET_PAGE_SIZE_BYTES_DEFAULT.toString))
   }
 
   protected def getTargetFileSizeBytes: String = {
-    val config = IcebergWriteUtil.getWriteProperty(write)
-    config.getOrDefault(WRITE_TARGET_FILE_SIZE_BYTES, 
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT.toString)
+    IcebergWriteUtil.getWriteConf(write).targetDataFileSize().toString
   }
 
   protected def getPartitionSpec: PartitionSpec = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to