This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f761ebb  [Fix](connector) improve accept_any_schema to options (#326)
f761ebb is described below

commit f761ebb2caa135e9336cb41ec1278413b9e7933e
Author: wanggx <[email protected]>
AuthorDate: Wed Jul 2 11:37:43 2025 +0800

    [Fix](connector) improve accept_any_schema to options (#326)
    
    * change accept_any_schema to options
    * add update partial columns itcase
---
 .../client/write/AbstractStreamLoadProcessor.java  |  10 +-
 .../apache/doris/spark/config/DorisOptions.java    |   8 +
 .../doris/spark/sql/DorisAnySchemaITCase.scala     | 364 +++++++++++++++++++++
 .../doris/spark/catalog/DorisTableBase.scala       |  11 +-
 4 files changed, 384 insertions(+), 9 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index b4dcbf1..f64d201 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -53,25 +53,23 @@ import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static org.apache.doris.spark.config.DorisOptions.GROUP_COMMIT;
+import static org.apache.doris.spark.config.DorisOptions.PARTIAL_COLUMNS;
+import static org.apache.doris.spark.config.DorisOptions.VALID_GROUP_MODE;
+
 public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R> 
implements DorisCommitter {
 
     protected static final JsonMapper MAPPER =
             
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
 false).build();
-    private static final String PARTIAL_COLUMNS = "partial_columns";
-    private static final String GROUP_COMMIT = "group_commit";
-    private static final Set<String> VALID_GROUP_MODE =
-            new HashSet<>(Arrays.asList("sync_mode", "async_mode", 
"off_mode"));
     private static final int arrowBufferSize = 1000;
     protected final Logger logger = 
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
     protected final DorisConfig config;
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 5c31fa1..0be7acf 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -17,6 +17,10 @@
 
 package org.apache.doris.spark.config;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
 public class DorisOptions {
 
     public static final ConfigOption<String> DORIS_FENODES = 
ConfigOptions.name("doris.fenodes").stringType().withoutDefaultValue().withDescription("");
@@ -73,6 +77,10 @@ public class DorisOptions {
     public static final ConfigOption<String> DORIS_MAX_FILTER_RATIO = 
ConfigOptions.name("doris.max.filter.ratio").stringType().withoutDefaultValue().withDescription("");
 
     public static final String STREAM_LOAD_PROP_PREFIX = 
"doris.sink.properties.";
+    public static final String PARTIAL_COLUMNS = "partial_columns";
+    public static final String GROUP_COMMIT = "group_commit";
+    public static final Set<String> VALID_GROUP_MODE =
+            new HashSet<>(Arrays.asList("sync_mode", "async_mode", 
"off_mode"));
 
     public static final ConfigOption<Integer> DORIS_SINK_TASK_PARTITION_SIZE = 
ConfigOptions.name("doris.sink.task.partition.size").intType().withoutDefaultValue().withDescription("");
 
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
new file mode 100644
index 0000000..f516eaa
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.doris.spark.container.{AbstractContainerTestBase, 
ContainerUtils}
+import 
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
 getDorisQueryConnection}
+import org.apache.doris.spark.rest.models.DataModel
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.Test
+import org.slf4j.LoggerFactory
+
+import java.util
+import scala.collection.JavaConverters._
+
+class DorisAnySchemaITCase extends AbstractContainerTestBase {
+
+  private val LOG = LoggerFactory.getLogger(classOf[DorisAnySchemaITCase])
+
+  val DATABASE: String = "example_db"
+  /*
+   *   CREATE TABLE table4
+   *   (
+   *       siteid INT DEFAULT '10',
+   *       citycode SMALLINT,
+   *       username VARCHAR(32) DEFAULT '',
+   *       pv BIGINT DEFAULT '0'
+   *   )
+   *   UNIQUE KEY(siteid, citycode, username)
+   *   DISTRIBUTED BY HASH(siteid) BUCKETS 10
+   *   PROPERTIES("replication_num" = "1");
+   */
+  val dorisTable = "table4"
+
+  /*
+   *   CREATE TABLE table2
+   *   (
+   *       siteid INT DEFAULT '10',
+   *       citycode SMALLINT,
+   *       username VARCHAR(32) DEFAULT '',
+   *       pv BIGINT DEFAULT '0'
+   *   )
+   *   UNIQUE KEY(siteid, citycode, username)
+   *   DISTRIBUTED BY HASH(siteid) BUCKETS 10
+   *   PROPERTIES("replication_num" = "1");
+   */
+  val dorisSourceTable = "table2"
+
+  /*
+   *   CREATE TABLE table5
+   *   (
+   *       siteid INT DEFAULT '10',
+   *       citycode SMALLINT,
+   *       username VARCHAR(32) DEFAULT '',
+   *       pv BIGINT DEFAULT '0',
+   *       p_value BIGINT DEFAULT '0'
+   *   )
+   *   UNIQUE KEY(siteid, citycode, username)
+   *   DISTRIBUTED BY HASH(siteid) BUCKETS 10
+   *   PROPERTIES("replication_num" = "1");
+   */
+  val dorisPartialTable = "table5"
+
+  @Test
+  def jsonDataWriteTest(): Unit = {
+    initializeTable(dorisTable, DataModel.UNIQUE)
+
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val df = spark.createDataFrame(Seq(
+        (0, 0, "user1", 100),
+        (1, 0, "user2", 100),
+        (3, 0, "user2", 200)
+      )).toDF("siteid", "citycode", "username", "pv")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + dorisTable)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("doris.sink.properties.format", "json")
+        .option("sink.batch.size", 2)
+        .option("sink.max-retries", 2)
+        .mode(SaveMode.Append)
+        .save()
+
+      Thread.sleep(10000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, dorisTable),
+        4)
+      val expected = util.Arrays.asList("0,0,user1,100", "1,0,user2,100", 
"3,0,user2,200");
+      checkResultInAnyOrder("jsonDataWriteTest", expected.toArray, 
actual.toArray)
+    } finally {
+      spark.stop()
+    }
+  }
+
+  @Test
+  def jsonDataWriteWithPartialUpdateTest(): Unit = {
+    initializePartialTable(dorisPartialTable, DataModel.UNIQUE)
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val df = spark.createDataFrame(Seq(
+        (0, 0, "user4", 100),
+        (1, 0, "user5", 100),
+        (3, 0, "user6", 200)
+      )).toDF("siteid", "citycode", "username", "pv")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + dorisPartialTable)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("doris.sink.properties.format", "json")
+        .option("doris.sink.properties.partial_columns", "true")
+        .option("doris.write.fields", "siteid,citycode,username,pv")
+        .option("sink.batch.size", 2)
+        .option("sink.max-retries", 2)
+        .mode(SaveMode.Append)
+        .save()
+      spark.stop()
+      Thread.sleep(2000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, dorisPartialTable),
+        5)
+      val expected = util.Arrays.asList("0,0,user4,100,0", "1,0,user5,100,0", 
"3,0,user6,200,0");
+      checkResultInAnyOrder("jsonDataWriteWithPartialUpdateTest", 
expected.toArray, actual.toArray)
+    } finally {
+      spark.stop()
+    }
+  }
+
+  @Test
+  def jsonDataWriteSqlTest(): Unit = {
+    initializeTable(dorisTable, DataModel.UNIQUE)
+    initializeTable(dorisSourceTable, DataModel.UNIQUE)
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val doris = spark.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_lh
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + dorisTable}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}"
+           |)
+           |""".stripMargin)
+      spark.sql(
+        """
+          |insert into test_lh values (0, 0, "user1", 100), (1, 0, "user2", 
100),(2, 1, "user3", 100),(3, 0, "user2", 200)
+          |""".stripMargin)
+
+      spark.sql(
+        s"""
+           |CREATE TEMPORARY VIEW table2
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + dorisSourceTable}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}"
+           |)
+           |""".stripMargin)
+
+      spark.sql(
+        """
+          |insert into table2 values (0, 0, "user1", 100), (1, 0, "user2", 
100),(2, 1, "user3", 100),(3, 0, "user2", 200)
+          |""".stripMargin)
+
+      spark.sql(
+        """
+          |insert into test_lh select siteid, citycode + 1, username, pv + 1 
from table2
+          |""".stripMargin)
+
+      Thread.sleep(2000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, dorisTable),
+        4)
+      val expected = util.Arrays.asList("0,0,user1,100", "1,0,user2,100", 
"2,1,user3,100", "3,0,user2,200",
+        "0,1,user1,101", "1,1,user2,101", "2,2,user3,101", "3,1,user2,201");
+      checkResultInAnyOrder("jsonDataWriteSqlTest", expected.toArray, 
actual.toArray)
+    } finally {
+      spark.stop()
+    }
+  }
+
+  @Test
+  def jsonDataWriteWithPartialUpdateSqlTest(): Unit = {
+    initializePartialTable(dorisPartialTable, DataModel.UNIQUE)
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val version = spark.version
+      LOG.info("spark version: " + version)
+      if (StringUtils.startsWith(version, "2")
+        || StringUtils.startsWith(version, "3.1")
+        || StringUtils.startsWith(version, "3.2")
+        || StringUtils.startsWith(version, "3.4")) {
+        LOG.warn("sql partial_columns is only support in spark3.3/3.5+")
+        return
+      }
+      val doris = spark.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_lh
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + dorisPartialTable}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.sink.properties.format" = "csv",
+           | "doris.sink.properties.partial_columns" = "true",
+           | "doris.write.fields" = "siteid,citycode,username,pv"
+           |)
+           |""".stripMargin)
+      spark.sql(
+        """
+          |INSERT INTO test_lh (siteid, citycode, username, pv) VALUES (0, 0, 
'user1',3)
+          |""".stripMargin)
+
+      Thread.sleep(2000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, dorisPartialTable),
+        5)
+      val expected = util.Arrays.asList("0,0,user1,3,0");
+      checkResultInAnyOrder("jsonDataWriteWithPartialUpdateSqlTest", 
expected.toArray, actual.toArray)
+    } finally {
+      spark.stop()
+    }
+  }
+
+  @Test
+  def jsonDataWriteWithPartialUpdateSqlTest1(): Unit = {
+    initializePartialTable(dorisPartialTable, DataModel.UNIQUE)
+    initializeTable(dorisSourceTable, DataModel.UNIQUE)
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val version = spark.version
+      LOG.info("spark version: " + version)
+      if (StringUtils.startsWith(version, "2")
+        || StringUtils.startsWith(version, "3.1")
+        || StringUtils.startsWith(version, "3.2")
+        || StringUtils.startsWith(version, "3.4")) {
+        LOG.warn("sql partial_columns is only support in spark3.3/3.5+")
+        return
+      }
+      val doris = spark.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_lh
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + dorisPartialTable}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.sink.properties.format" = "json",
+           | "doris.sink.properties.partial_columns" = "true"
+           |)
+           |""".stripMargin)
+
+      spark.sql(
+        s"""
+           |CREATE TEMPORARY VIEW table2
+           |USING doris
+           |OPTIONS(
+           | "table.identifier"="${DATABASE + "." + dorisSourceTable}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}"
+           |)
+           |""".stripMargin)
+
+      spark.sql(
+        """
+          |INSERT INTO table2 (siteid, citycode, username, pv) VALUES (0, 0, 
'user1',3)
+          |""".stripMargin)
+
+      spark.sql(
+        """
+          |insert into test_lh(siteid,citycode,username, pv) select 
siteid,citycode+1 as citycode,username,pv+1 as pv from table2
+          |""".stripMargin)
+      Thread.sleep(2000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, dorisPartialTable),
+        5)
+      val expected = util.Arrays.asList("0,1,user1,4,0");
+      checkResultInAnyOrder("jsonDataWriteWithPartialUpdateSqlTest1", 
expected.toArray, actual.toArray)
+    } finally {
+      spark.stop()
+    }
+  }
+
+  private def initializeTable(table: String, dataModel: DataModel): Unit = {
+    val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else 
",\"enable_unique_key_merge_on_write\" = \"false\""
+    val model = if (dataModel == DataModel.UNIQUE_MOR) 
DataModel.UNIQUE.toString else dataModel.toString
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+      String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+      String.format("CREATE TABLE %s.%s ( \n"
+        + " siteid INT DEFAULT '10',"
+        + " citycode SMALLINT, "
+        + " username VARCHAR(32) DEFAULT '',"
+        + " pv BIGINT DEFAULT '0' "
+        + " )"
+        + " %s KEY(siteid, citycode, username) "
+        + " DISTRIBUTED BY HASH(`siteid`) BUCKETS 1\n"
+        + "PROPERTIES ("
+        + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, 
model))
+  }
+
+  private def initializePartialTable(table: String, dataModel: DataModel): 
Unit = {
+    val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else 
",\"enable_unique_key_merge_on_write\" = \"false\""
+    val model = if (dataModel == DataModel.UNIQUE_MOR) 
DataModel.UNIQUE.toString else dataModel.toString
+    ContainerUtils.executeSQLStatement(
+      getDorisQueryConnection,
+      LOG,
+      String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+      String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+      String.format("CREATE TABLE %s.%s ( \n"
+        + " siteid INT DEFAULT '10',"
+        + " citycode SMALLINT, "
+        + " username VARCHAR(32) DEFAULT '',"
+        + " pv BIGINT DEFAULT '0', "
+        + " p_value BIGINT DEFAULT '0' "
+        + " )"
+        + " %s KEY(siteid, citycode, username) "
+        + " DISTRIBUTED BY HASH(`siteid`) BUCKETS 1\n"
+        + "PROPERTIES ("
+        + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table, 
model))
+  }
+
+  private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef], 
actual: Array[AnyRef]): Unit = {
+    LOG.info("Checking DorisAnySchemaITCase result. testName={}, actual={}, 
expected={}", testName, actual, expected)
+    assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
+  }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 50af0fa..90da702 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import java.util
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.language.implicitConversions
 
 
@@ -45,11 +46,15 @@ abstract class DorisTableBase(identifier: Identifier, 
config: DorisConfig, schem
   })
 
   override def capabilities(): util.Set[TableCapability] = {
-    Set(BATCH_READ,
+    val capabilities = mutable.Set(BATCH_READ,
       BATCH_WRITE,
       STREAMING_WRITE,
-      ACCEPT_ANY_SCHEMA,
-      TRUNCATE).asJava
+      TRUNCATE)
+    val properties = config.getSinkProperties
+    if (properties.containsKey(DorisOptions.PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(DorisOptions.PARTIAL_COLUMNS))) {
+      capabilities += ACCEPT_ANY_SCHEMA
+    }
+    capabilities.asJava
   }
 
   override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to