This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 237a8ae  [Feature] support spark connector sink data using sql (#6796)
237a8ae is described below

commit 237a8ae94876d673cc5508d8c8ed0bbd38486983
Author: wei zhao <[email protected]>
AuthorDate: Sat Oct 9 15:47:36 2021 +0800

    [Feature] support spark connector sink data using sql (#6796)
    
    Co-authored-by: wei.zhao <[email protected]>
---
 docs/en/extending-doris/spark-doris-connector.md      | 18 ++++++++++++++++++
 docs/zh-CN/extending-doris/spark-doris-connector.md   | 17 +++++++++++++++++
 .../org/apache/doris/spark/sql/DorisRelation.scala    | 19 +++++++++++++++++--
 .../apache/doris/spark/sql/DorisSourceProvider.scala  |  7 ++++++-
 4 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/docs/en/extending-doris/spark-doris-connector.md 
b/docs/en/extending-doris/spark-doris-connector.md
index 909db2a..ce1c80e 100644
--- a/docs/en/extending-doris/spark-doris-connector.md
+++ b/docs/en/extending-doris/spark-doris-connector.md
@@ -104,6 +104,24 @@ val dorisSparkRDD = sc.dorisRDD(
 dorisSparkRDD.collect()
 ```
 ### Write
+
+#### SQL
+
+```sql
+CREATE TEMPORARY VIEW spark_doris
+USING doris
+OPTIONS(
+  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
+  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
+  "user"="$YOUR_DORIS_USERNAME",
+  "password"="$YOUR_DORIS_PASSWORD"
+);
+
+INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
+# or
+INSERT INTO spark_doris SELECT * FROM YOUR_TABLE
+```
+
 #### DataFrame(batch/stream)
 ```scala
 ## batch sink
diff --git a/docs/zh-CN/extending-doris/spark-doris-connector.md 
b/docs/zh-CN/extending-doris/spark-doris-connector.md
index fa69f38..92d13d3 100644
--- a/docs/zh-CN/extending-doris/spark-doris-connector.md
+++ b/docs/zh-CN/extending-doris/spark-doris-connector.md
@@ -106,6 +106,23 @@ dorisSparkRDD.collect()
 
 ### 写入
 
+#### SQL
+
+```sql
+CREATE TEMPORARY VIEW spark_doris
+USING doris
+OPTIONS(
+  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
+  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
+  "user"="$YOUR_DORIS_USERNAME",
+  "password"="$YOUR_DORIS_PASSWORD"
+);
+
+INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
+# or
+INSERT INTO spark_doris SELECT * FROM YOUR_TABLE
+```
+
 #### DataFrame(batch/stream)
 
 ```scala
diff --git 
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
 
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index 2f2a252..3e3616d 100644
--- 
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++ 
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -28,12 +28,12 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 
 private[sql] class DorisRelation(
     val sqlContext: SQLContext, parameters: Map[String, String])
-    extends BaseRelation with TableScan with PrunedScan with 
PrunedFilteredScan {
+    extends BaseRelation with TableScan with PrunedScan with 
PrunedFilteredScan with InsertableRelation{
 
   private lazy val cfg = {
     val conf = new SparkSettings(sqlContext.sparkContext.getConf)
@@ -86,4 +86,19 @@ private[sql] class DorisRelation(
 
     new ScalaDorisRowRDD(sqlContext.sparkContext, paramWithScan.toMap, 
lazySchema)
   }
+
+  // Insert Table
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    //replace 'doris.request.auth.user' with 'user' and 
'doris.request.auth.password' with 'password'
+    val insertCfg = cfg.copy().asProperties().asScala.map {
+      case (ConfigurationOptions.DORIS_REQUEST_AUTH_USER, v) =>
+        ("user", v)
+      case (ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, v) =>
+        ("password", v)
+      case (k, v) => (k, v)
+    }
+    data.write.format(DorisSourceProvider.SHORT_NAME)
+      .options(insertCfg)
+      .save()
+  }
 }
diff --git 
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
 
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 65f5250..3ac087d 100644
--- 
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ 
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -19,6 +19,7 @@ package org.apache.doris.spark.sql
 
 import org.apache.doris.spark.DorisStreamLoad
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -35,7 +36,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
 import scala.util.control.Breaks
 
 private[sql] class DorisSourceProvider extends DataSourceRegister with 
RelationProvider with CreatableRelationProvider with StreamWriteSupport with 
Logging {
-  override def shortName(): String = "doris"
+  override def shortName(): String = SHORT_NAME
 
   override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String]): BaseRelation = {
     new DorisRelation(sqlContext, Utils.params(parameters, log))
@@ -129,3 +130,7 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister with RelationP
     new DorisStreamWriter(sparkSettings)
   }
 }
+
+object DorisSourceProvider {
+  val SHORT_NAME: String = "doris"
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to