This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7e60b5d [feature](connector) support overwrite for datasource v2
(#281)
7e60b5d is described below
commit 7e60b5d3e97c5e09751dcfe59acb3ca9008cae0d
Author: gnehil <[email protected]>
AuthorDate: Mon Mar 17 20:36:49 2025 +0800
[feature](connector) support overwrite for datasource v2 (#281)
---
spark-doris-connector/pom.xml | 2 +-
.../apache/doris/spark/client/DorisFrontendClient.java | 12 ++++++++++++
.../apache/doris/spark/catalog/DorisTableBase.scala | 3 ++-
.../apache/doris/spark/write/DorisWriteBuilder.scala | 18 +++++++++++++++---
4 files changed, 30 insertions(+), 5 deletions(-)
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 6d610da..8c975df 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -79,7 +79,7 @@
</mailingLists>
<properties>
- <revision>25.0.0-SNAPSHOT</revision>
+ <revision>25.1.0-SNAPSHOT</revision>
<spark.version>2.4.8</spark.version>
<spark.major.version>2.4</spark.major.version>
<scala.version>2.11.12</scala.version>
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index 0a6669e..1c4c49c 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -362,6 +362,18 @@ public class DorisFrontendClient implements Serializable {
});
}
+ public void truncateTable(String database, String table) throws Exception {
+ queryFrontends(conn -> {
+ String sql = "TRUNCATE TABLE " + database + "." + table;
+ try (PreparedStatement preparedStatement =
conn.prepareStatement(sql)) {
+ preparedStatement.execute();
+ return null;
+ } catch (SQLException e) {
+ throw new RuntimeException("truncate table failed", e);
+ }
+ });
+ }
+
public List<Frontend> getFrontends() {
return frontends;
}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 7f8fffd..50af0fa 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -48,7 +48,8 @@ abstract class DorisTableBase(identifier: Identifier, config:
DorisConfig, schem
Set(BATCH_READ,
BATCH_WRITE,
STREAMING_WRITE,
- ACCEPT_ANY_SCHEMA).asJava
+ ACCEPT_ANY_SCHEMA,
+ TRUNCATE).asJava
}
override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
index 521d06c..b74cc45 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWriteBuilder.scala
@@ -17,14 +17,22 @@
package org.apache.doris.spark.write
-import org.apache.doris.spark.config.DorisConfig
+import org.apache.doris.spark.client.DorisFrontendClient
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
-import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
+import org.apache.spark.sql.connector.write.{BatchWrite, SupportsTruncate,
WriteBuilder}
import org.apache.spark.sql.types.StructType
-class DorisWriteBuilder(config: DorisConfig, schema: StructType) extends
WriteBuilder {
+class DorisWriteBuilder(config: DorisConfig, schema: StructType) extends
WriteBuilder with SupportsTruncate {
+
+ private var isTruncate = false
override def buildForBatch(): BatchWrite = {
+ if (isTruncate) {
+ val client = new DorisFrontendClient(config)
+ val tableDb =
config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER).split("\\.")
+ client.truncateTable(tableDb(0), tableDb(1))
+ }
new DorisWrite(config, schema)
}
@@ -32,4 +40,8 @@ class DorisWriteBuilder(config: DorisConfig, schema:
StructType) extends WriteBu
new DorisWrite(config, schema)
}
+ override def truncate(): WriteBuilder = {
+ isTruncate = true
+ this
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]