This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 37a6498 [Improve] add schema less option (#332)
37a6498 is described below
commit 37a64981e116bb7abcf53b454d883479b90a846e
Author: wudi <[email protected]>
AuthorDate: Wed Jul 30 19:36:50 2025 +0800
[Improve] add schema less option (#332)
---
.../apache/doris/spark/config/DorisOptions.java | 2 ++
.../doris/spark/util/LoadBalanceListTest.java | 8 ++---
.../doris/spark/sql/DorisAnySchemaITCase.scala | 39 ++++++++++++++++++++++
.../doris/spark/catalog/DorisTableBase.scala | 4 ++-
4 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 0be7acf..5600411 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -69,6 +69,8 @@ public class DorisOptions {
public static final ConfigOption<String> DORIS_WRITE_FIELDS =
ConfigOptions.name("doris.write.fields").stringType().withoutDefaultValue().withDescription("");
+ public static final ConfigOption<Boolean> DORIS_WRITE_SCHEMA_LESS =
ConfigOptions.name("doris.write.schemaless").booleanType().defaultValue(false).withDescription("");
+
public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE =
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(500000).withDescription("");
public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES =
ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription("");
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
index e4226e9..01f9303 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
@@ -41,7 +41,7 @@ public class LoadBalanceListTest {
if (index++ == 0) {
testHeadSet.add(server);
}
- System.out.println(server);
+ // System.out.println(server);
}
if (i % serverList.size() == 0) {
Assert.assertTrue(testList.equals(Arrays.asList("server1", "server2",
"server3")));
@@ -55,7 +55,7 @@ public class LoadBalanceListTest {
Assert.assertTrue(testList.equals(Arrays.asList("server3", "server1",
"server2")));
}
- System.out.println("---------");
+ // System.out.println("---------");
Assert.assertTrue(testList.size() == serverList.size());
}
Assert.assertTrue(testHeadSet.size() == serverList.size());
@@ -80,9 +80,9 @@ public class LoadBalanceListTest {
if (++index > loadBalanceList.getList().size()
- failedSet.size()) {
Assert.assertTrue(failedSet.contains(server));
}
- System.out.println(server);
+ // System.out.println(server);
}
- System.out.println("---------");
+ // System.out.println("---------");
Assert.assertTrue(serverSet.size() ==
serverList.size());
}
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
index f516eaa..2d86f9e 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
@@ -76,6 +76,8 @@ class DorisAnySchemaITCase extends AbstractContainerTestBase {
*/
val dorisPartialTable = "table5"
+ val dorisSchemaLessTable = "table_schema_less"
+
@Test
def jsonDataWriteTest(): Unit = {
initializeTable(dorisTable, DataModel.UNIQUE)
@@ -316,6 +318,43 @@ class DorisAnySchemaITCase extends
AbstractContainerTestBase {
}
}
+
+ @Test
+ def jsonDataWriteWithSchemaLess(): Unit = {
+ initializeTable(dorisSchemaLessTable, DataModel.UNIQUE)
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ val df = spark.createDataFrame(Seq(
+ (0, 0, 100),
+ (1, 0, 100),
+ (3, 0, 200)
+ )).toDF("siteid", "citycode", "pv")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", getFenodes)
+ .option("doris.table.identifier", DATABASE + "." +
dorisSchemaLessTable)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("doris.sink.properties.format", "json")
+ .option("doris.write.schemaless", "true")
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
+ .mode(SaveMode.Append)
+ .save()
+ spark.stop()
+ Thread.sleep(2000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, dorisSchemaLessTable),
+ 4)
+ val expected = util.Arrays.asList("0,0,,100", "1,0,,100", "3,0,,200");
+ checkResultInAnyOrder("jsonDataWriteWithSchemaLess", expected.toArray,
actual.toArray)
+ } finally {
+ spark.stop()
+ }
+ }
+
private def initializeTable(table: String, dataModel: DataModel): Unit = {
val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else
",\"enable_unique_key_merge_on_write\" = \"false\""
val model = if (dataModel == DataModel.UNIQUE_MOR)
DataModel.UNIQUE.toString else dataModel.toString
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 90da702..2df410b 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -51,7 +51,9 @@ abstract class DorisTableBase(identifier: Identifier, config:
DorisConfig, schem
STREAMING_WRITE,
TRUNCATE)
val properties = config.getSinkProperties
- if (properties.containsKey(DorisOptions.PARTIAL_COLUMNS) &&
"true".equalsIgnoreCase(properties.get(DorisOptions.PARTIAL_COLUMNS))) {
+ val partialColumnsEnabled =
properties.containsKey(DorisOptions.PARTIAL_COLUMNS) &&
"true".equalsIgnoreCase(properties.get(DorisOptions.PARTIAL_COLUMNS))
+ val schemaLessEnabled =
config.getValue(DorisOptions.DORIS_WRITE_SCHEMA_LESS)
+ if (partialColumnsEnabled || schemaLessEnabled) {
capabilities += ACCEPT_ANY_SCHEMA
}
capabilities.asJava
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]