This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b194e192f [spark] Support lower case callback param if use dataframe
write (#3839)
b194e192f is described below
commit b194e192f23e761f91f56281665b679039592313
Author: askwang <[email protected]>
AuthorDate: Tue Aug 6 17:42:56 2024 +0800
[spark] Support lower case callback param if use dataframe write (#3839)
---
.../main/java/org/apache/paimon/CoreOptions.java | 7 +-
.../org/apache/paimon/spark/PaimonCommitTest.scala | 77 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 93ada725f..660580284 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -1959,7 +1960,11 @@ public class CoreOptions implements Serializable {
continue;
}
- String param = options.get(callbackParam.key().replace("#",
className));
+ String originParamKey = callbackParam.key().replace("#",
className);
+ String param = options.get(originParamKey);
+ if (param == null) {
+ param = options.get(originParamKey.toLowerCase(Locale.ROOT));
+ }
result.put(className, param);
}
return result;
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
new file mode 100644
index 000000000..0095e1024
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.manifest.{ManifestCommittable, ManifestEntry}
+import org.apache.paimon.table.sink.CommitCallback
+
+import org.junit.jupiter.api.Assertions
+
+import java.lang
+import java.util.List
+
+class PaimonCommitTest extends PaimonSparkTestBase {
+
+ test("test commit callback parameter compatibility") {
+ withTable("tb") {
+ spark.sql("""
+ |CREATE TABLE tb (id int, dt string) using paimon
+ |TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id',
'bucket'='1')
+ |""".stripMargin)
+
+ val table = loadTable("tb")
+ val location = table.location().toString
+
+ val _spark = spark
+ import _spark.implicits._
+ val df = Seq((1, "a"), (2, "b")).toDF("a", "b")
+ df.write
+ .format("paimon")
+ .option(CoreOptions.COMMIT_CALLBACKS.key(),
classOf[CustomCommitCallback].getName)
+ .option(
+ CoreOptions.COMMIT_CALLBACK_PARAM
+ .key()
+ .replace("#", classOf[CustomCommitCallback].getName),
+ "testid-100")
+ .mode("append")
+ .save(location)
+
+ Assertions.assertEquals(PaimonCommitTest.id, "testid-100")
+ }
+ }
+}
+
+object PaimonCommitTest {
+ var id = ""
+}
+
+case class CustomCommitCallback(testId: String) extends CommitCallback {
+
+ override def call(
+ committedEntries: List[ManifestEntry],
+ identifier: Long,
+ watermark: lang.Long): Unit = {
+ PaimonCommitTest.id = testId
+ }
+
+ override def retry(committable: ManifestCommittable): Unit = {}
+
+ override def close(): Unit = {}
+}