This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2f59458102 [spark] Adding support for Iceberg compatibility options to
be passed as table properties with dataframe APIs (#6803)
2f59458102 is described below
commit 2f594581028514dfaf55143be2de51e3da46df22
Author: junmuz <[email protected]>
AuthorDate: Mon Dec 22 04:08:23 2025 +0000
[spark] Adding support for Iceberg compatibility options to be passed as
table properties with dataframe APIs (#6803)
---
.../org/apache/paimon/iceberg/IcebergOptions.java | 23 +++++++++++++
.../shim/PaimonCreateTableAsSelectStrategy.scala | 13 ++++++--
.../shim/PaimonCreateTableAsSelectStrategy.scala | 13 ++++++--
.../shim/PaimonCreateTableAsSelectStrategy.scala | 13 ++++++--
.../shim/PaimonCreateTableAsSelectStrategy.scala | 13 ++++++--
.../apache/paimon/spark/sql/PaimonOptionTest.scala | 39 ++++++++++++++++++++++
6 files changed, 102 insertions(+), 12 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index f371db825c..be4f6152ef 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -26,7 +26,10 @@ import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.options.description.TextElement;
import org.apache.paimon.utils.Preconditions;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.apache.paimon.options.ConfigOptions.key;
@@ -255,4 +258,24 @@ public class IcebergOptions {
return TextElement.text(description);
}
}
+
+ /**
+ * Returns all ConfigOption fields defined in this class. This method uses
reflection to
+ * dynamically discover all ConfigOption fields, ensuring that new options
are automatically
+ * included without code changes.
+ */
+ public static List<ConfigOption<?>> getOptions() {
+ final Field[] fields = IcebergOptions.class.getFields();
+ final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
+ for (Field field : fields) {
+ if (ConfigOption.class.isAssignableFrom(field.getType())) {
+ try {
+ list.add((ConfigOption<?>)
field.get(IcebergOptions.class));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return list;
+ }
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 05370ddc90..cc6258e6eb 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.shim
import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.spark.sql.{SparkSession, Strategy}
@@ -39,10 +40,16 @@ case class PaimonCreateTableAsSelectStrategy(spark:
SparkSession) extends Strate
throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
case _ =>
val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
- val (coreOptions, writeOptions) = options.partition {
- case (key, _) => coreOptionKeys.contains(key)
+
+ // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
+ val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+ val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+ val (tableOptions, writeOptions) = options.partition {
+ case (key, _) => allTableOptionKeys.contains(key)
}
- val newProps = CatalogV2Util.withDefaultOwnership(props) ++
coreOptions
+ val newProps = CatalogV2Util.withDefaultOwnership(props) ++
tableOptions
val isPartitionedFormatTable = {
catalog match {
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 4d4104d1ed..a09996f153 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.shim
import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
@@ -51,10 +52,16 @@ case class PaimonCreateTableAsSelectStrategy(spark:
SparkSession)
throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
case _ =>
val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
- val (coreOptions, writeOptions) = options.partition {
- case (key, _) => coreOptionKeys.contains(key)
+
+ // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
+ val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+ val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+ val (tableOptions, writeOptions) = options.partition {
+ case (key, _) => allTableOptionKeys.contains(key)
}
- val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ coreOptions)
+ val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ tableOptions)
val isPartitionedFormatTable = {
catalog match {
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index d5424493eb..4a82f35188 100644
---
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.shim
import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
@@ -53,10 +54,16 @@ case class PaimonCreateTableAsSelectStrategy(spark:
SparkSession)
throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
case _ =>
val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
- val (coreOptions, writeOptions) = options.partition {
- case (key, _) => coreOptionKeys.contains(key)
+
+ // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
+ val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+ val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+ val (tableOptions, writeOptions) = options.partition {
+ case (key, _) => allTableOptionKeys.contains(key)
}
- val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ coreOptions)
+ val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ tableOptions)
val isPartitionedFormatTable = {
catalog match {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index fd6627c095..61e25b7c16 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.shim
import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
@@ -51,10 +52,16 @@ case class PaimonCreateTableAsSelectStrategy(spark:
SparkSession)
throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
case _ =>
val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
- val (coreOptions, writeOptions) = options.partition {
- case (key, _) => coreOptionKeys.contains(key)
+
+ // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
+ val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
+
+ val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
+
+ val (tableOptions, writeOptions) = options.partition {
+ case (key, _) => allTableOptionKeys.contains(key)
}
- val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ coreOptions)
+ val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ tableOptions)
val isPartitionedFormatTable = {
catalog match {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
index a51893941e..14351103f4 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -55,6 +55,45 @@ class PaimonOptionTest extends PaimonSparkTestBase {
}
}
+ test("Paimon Option: create table with Iceberg compatibility options via
DataFrame writer") {
+ Seq((1L, "x1"), (2L, "x2"))
+ .toDF("a", "b")
+ .write
+ .format("paimon")
+ .option("primary-key", "a")
+ .option("bucket", "-1")
+ .option("metadata.iceberg.database", "db_t")
+ .option("metadata.iceberg.table", "t_ib")
+ .option("metadata.iceberg.storage", "hadoop-catalog")
+ .option("metadata.iceberg.storage-location", "table-location")
+ .option("metadata.iceberg.manifest-legacy-version", "true")
+ .option("metadata.iceberg.manifest-compression", "snappy")
+ .option("metadata.iceberg.previous-versions-max", "5")
+ .option("metadata.iceberg.uri", "")
+ .saveAsTable("T_IB")
+
+ val table = loadTable("T_IB")
+
+ // Verify primary key is also stored (existing functionality still works)
+ Assertions.assertEquals(1, table.primaryKeys().size())
+ Assertions.assertEquals("a", table.primaryKeys().get(0))
+
+ // Verify bucket configuration
+ Assertions.assertEquals("-1", table.options().get("bucket"))
+
+ // Verify Iceberg compatibility options are stored permanently
+ Assertions.assertEquals("db_t",
table.options().get("metadata.iceberg.database"))
+ Assertions.assertEquals("t_ib",
table.options().get("metadata.iceberg.table"))
+ Assertions.assertEquals("hadoop-catalog",
table.options().get("metadata.iceberg.storage"))
+ Assertions.assertEquals(
+ "table-location",
+ table.options().get("metadata.iceberg.storage-location"))
+ Assertions.assertEquals("true",
table.options().get("metadata.iceberg.manifest-legacy-version"))
+ Assertions.assertEquals("snappy",
table.options().get("metadata.iceberg.manifest-compression"))
+ Assertions.assertEquals("5",
table.options().get("metadata.iceberg.previous-versions-max"))
+ Assertions.assertEquals("", table.options().get("metadata.iceberg.uri"))
+ }
+
test("Paimon Option: query table with sql conf") {
sql("CREATE TABLE T (id INT)")
sql("INSERT INTO T VALUES 1")