This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4189a23f1 [spark] Support global options via SQL conf (#3825)
4189a23f1 is described below

commit 4189a23f1b139821359b529a0bd55fb54520bcf7
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jul 30 12:04:31 2024 +0800

    [spark] Support global options via SQL conf (#3825)
---
 .../java/org/apache/paimon/spark/SparkCatalog.java | 45 +++++-------
 .../org/apache/paimon/spark/SparkSource.scala      |  5 +-
 .../org/apache/paimon/spark/util/OptionUtils.scala | 54 +++++++++++++++
 .../apache/paimon/spark/sql/PaimonOptionTest.scala | 79 ++++++++++++++++++++++
 4 files changed, 154 insertions(+), 29 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c28a65cc8..d4576fa8d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -26,7 +26,6 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.spark.catalog.SparkBaseCatalog;
-import org.apache.paimon.table.Table;
 
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -57,6 +56,8 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
 import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
+import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
+import static org.apache.paimon.spark.util.OptionUtils.mergeSQLConf;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Spark {@link TableCatalog} for paimon. */
@@ -220,21 +221,16 @@ public class SparkCatalog extends SparkBaseCatalog {
 
     @Override
     public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
-        try {
-            return new SparkTable(load(ident));
-        } catch (Catalog.TableNotExistException e) {
-            throw new NoSuchTableException(ident);
-        }
+        return loadSparkTable(ident, Collections.emptyMap());
     }
 
     /**
      * Do not annotate with <code>@override</code> here to maintain 
compatibility with Spark 3.2-.
      */
     public SparkTable loadTable(Identifier ident, String version) throws 
NoSuchTableException {
-        Table table = loadPaimonTable(ident);
         LOG.info("Time travel to version '{}'.", version);
-        return new SparkTable(
-                
table.copy(Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version)));
+        return loadSparkTable(
+                ident, 
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version));
     }
 
     /**
@@ -244,22 +240,13 @@ public class SparkCatalog extends SparkBaseCatalog {
      * TableCatalog#loadTable(Identifier, long)}). But in SQL you should use 
seconds.
      */
     public SparkTable loadTable(Identifier ident, long timestamp) throws 
NoSuchTableException {
-        Table table = loadPaimonTable(ident);
         // Paimon's timestamp use millisecond
         timestamp = timestamp / 1000;
-
         LOG.info("Time travel target timestamp is {} milliseconds.", 
timestamp);
-
-        Options option = new Options().set(CoreOptions.SCAN_TIMESTAMP_MILLIS, 
timestamp);
-        return new SparkTable(table.copy(option.toMap()));
-    }
-
-    private Table loadPaimonTable(Identifier ident) throws 
NoSuchTableException {
-        try {
-            return load(ident);
-        } catch (Catalog.TableNotExistException e) {
-            throw new NoSuchTableException(ident);
-        }
+        return loadSparkTable(
+                ident,
+                Collections.singletonMap(
+                        CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 
String.valueOf(timestamp)));
     }
 
     @Override
@@ -400,7 +387,7 @@ public class SparkCatalog extends SparkBaseCatalog {
                                     return references.length == 1
                                             && references[0] instanceof 
FieldReference;
                                 }));
-        Map<String, String> normalizedProperties = new HashMap<>(properties);
+        Map<String, String> normalizedProperties = mergeSQLConf(properties);
         normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
         normalizedProperties.remove(TableCatalog.PROP_COMMENT);
         String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
@@ -469,10 +456,14 @@ public class SparkCatalog extends SparkBaseCatalog {
         return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], 
ident.name());
     }
 
-    /** Load a Table Store table. */
-    protected org.apache.paimon.table.Table load(Identifier ident)
-            throws Catalog.TableNotExistException, NoSuchTableException {
-        return catalog.getTable(toIdentifier(ident));
+    protected SparkTable loadSparkTable(Identifier ident, Map<String, String> 
extraOptions)
+            throws NoSuchTableException {
+        try {
+            return new SparkTable(
+                    copyWithSQLConf(catalog.getTable(toIdentifier(ident)), 
extraOptions));
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(ident);
+        }
     }
 
     // --------------------- unsupported methods ----------------------------
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index e7e744f37..8ea2c31bc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.CatalogContext
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.spark.sources.PaimonSink
+import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf
 import org.apache.paimon.table.{DataTable, FileStoreTable, 
FileStoreTableFactory}
 import org.apache.paimon.table.system.AuditLogTable
 
@@ -64,7 +65,7 @@ class SparkSource
       schema: StructType,
       partitioning: Array[Transform],
       properties: JMap[String, String]): Table = {
-    new SparkTable(loadTable(properties))
+    SparkTable(loadTable(properties))
   }
 
   override def createRelation(
@@ -80,7 +81,7 @@ class SparkSource
 
   private def loadTable(options: JMap[String, String]): DataTable = {
     val catalogContext = CatalogContext.create(
-      Options.fromMap(options),
+      Options.fromMap(mergeSQLConf(options)),
       SparkSession.active.sessionState.newHadoopConf())
     val table = FileStoreTableFactory.create(catalogContext)
     if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
new file mode 100644
index 000000000..af7ff7204
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.table.Table
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+object OptionUtils extends SQLConfHelper {
+
+  private val PAIMON_OPTION_PREFIX = "spark.paimon."
+
+  def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = 
{
+    val mergedOptions = new JHashMap[String, String](
+      conf.getAllConfs
+        .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
+        .map {
+          case (key, value) =>
+            key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
+        }
+        .asJava)
+    mergedOptions.putAll(extraOptions)
+    mergedOptions
+  }
+
+  def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, 
String]): T = {
+    val mergedOptions = mergeSQLConf(extraOptions)
+    if (mergedOptions.isEmpty) {
+      table
+    } else {
+      table.copy(mergedOptions).asInstanceOf[T]
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
new file mode 100644
index 000000000..9fc571634
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTableFactory
+
+import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
+
+class PaimonOptionTest extends PaimonSparkTestBase {
+
+  import testImplicits._
+
+  test("Paimon Option: create table with sql conf") {
+    withSQLConf("spark.paimon.file.block-size" -> "512M") {
+      sql("CREATE TABLE T (id INT)")
+      val table = loadTable("T")
+      // check options in schema file directly
+      val fileStoreTable = FileStoreTableFactory.create(table.fileIO(), 
table.location())
+      Assertions.assertEquals("512M", 
fileStoreTable.options().get("file.block-size"))
+    }
+  }
+
+  test("Paimon Option: create table by dataframe with sql conf") {
+    withSQLConf("spark.paimon.file.block-size" -> "512M") {
+      Seq((1L, "x1"), (2L, "x2"))
+        .toDF("a", "b")
+        .write
+        .format("paimon")
+        .mode("append")
+        .saveAsTable("T")
+      val table = loadTable("T")
+      // check options in schema file directly
+      val fileStoreTable = FileStoreTableFactory.create(table.fileIO(), 
table.location())
+      Assertions.assertEquals("512M", 
fileStoreTable.options().get("file.block-size"))
+    }
+  }
+
+  test("Paimon Option: query table with sql conf") {
+    sql("CREATE TABLE T (id INT)")
+    sql("INSERT INTO T VALUES 1")
+    sql("INSERT INTO T VALUES 2")
+    checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
+    val table = loadTable("T")
+
+    // query with mutable option
+    withSQLConf("spark.paimon.scan.snapshot-id" -> "1") {
+      checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
+      checkAnswer(spark.read.format("paimon").load(table.location().toString), 
Row(1))
+    }
+
+    // query with immutable option
+    withSQLConf("spark.paimon.bucket" -> "1") {
+      assertThrows[UnsupportedOperationException] {
+        sql("SELECT * FROM T ORDER BY id")
+      }
+      assertThrows[UnsupportedOperationException] {
+        spark.read.format("paimon").load(table.location().toString)
+      }
+    }
+  }
+}

Reply via email to