This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 237a8ae [Feature] support spark connector sink data using sql (#6796)
237a8ae is described below
commit 237a8ae94876d673cc5508d8c8ed0bbd38486983
Author: wei zhao <[email protected]>
AuthorDate: Sat Oct 9 15:47:36 2021 +0800
[Feature] support spark connector sink data using sql (#6796)
Co-authored-by: wei.zhao <[email protected]>
---
docs/en/extending-doris/spark-doris-connector.md | 18 ++++++++++++++++++
docs/zh-CN/extending-doris/spark-doris-connector.md | 17 +++++++++++++++++
.../org/apache/doris/spark/sql/DorisRelation.scala | 19 +++++++++++++++++--
.../apache/doris/spark/sql/DorisSourceProvider.scala | 7 ++++++-
4 files changed, 58 insertions(+), 3 deletions(-)
diff --git a/docs/en/extending-doris/spark-doris-connector.md
b/docs/en/extending-doris/spark-doris-connector.md
index 909db2a..ce1c80e 100644
--- a/docs/en/extending-doris/spark-doris-connector.md
+++ b/docs/en/extending-doris/spark-doris-connector.md
@@ -104,6 +104,24 @@ val dorisSparkRDD = sc.dorisRDD(
dorisSparkRDD.collect()
```
### Write
+
+#### SQL
+
+```sql
+CREATE TEMPORARY VIEW spark_doris
+USING doris
+OPTIONS(
+ "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
+ "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
+ "user"="$YOUR_DORIS_USERNAME",
+ "password"="$YOUR_DORIS_PASSWORD"
+);
+
+INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
+# or
+INSERT INTO spark_doris SELECT * FROM YOUR_TABLE
+```
+
#### DataFrame(batch/stream)
```scala
## batch sink
diff --git a/docs/zh-CN/extending-doris/spark-doris-connector.md
b/docs/zh-CN/extending-doris/spark-doris-connector.md
index fa69f38..92d13d3 100644
--- a/docs/zh-CN/extending-doris/spark-doris-connector.md
+++ b/docs/zh-CN/extending-doris/spark-doris-connector.md
@@ -106,6 +106,23 @@ dorisSparkRDD.collect()
### 写入
+#### SQL
+
+```sql
+CREATE TEMPORARY VIEW spark_doris
+USING doris
+OPTIONS(
+ "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
+ "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
+ "user"="$YOUR_DORIS_USERNAME",
+ "password"="$YOUR_DORIS_PASSWORD"
+);
+
+INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
+# or
+INSERT INTO spark_doris SELECT * FROM YOUR_TABLE
+```
+
#### DataFrame(batch/stream)
```scala
diff --git
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index 2f2a252..3e3616d 100644
---
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -28,12 +28,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
private[sql] class DorisRelation(
val sqlContext: SQLContext, parameters: Map[String, String])
- extends BaseRelation with TableScan with PrunedScan with
PrunedFilteredScan {
+ extends BaseRelation with TableScan with PrunedScan with
PrunedFilteredScan with InsertableRelation{
private lazy val cfg = {
val conf = new SparkSettings(sqlContext.sparkContext.getConf)
@@ -86,4 +86,19 @@ private[sql] class DorisRelation(
new ScalaDorisRowRDD(sqlContext.sparkContext, paramWithScan.toMap,
lazySchema)
}
+
+ // Insert Table
+ override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ //replace 'doris.request.auth.user' with 'user' and
'doris.request.auth.password' with 'password'
+ val insertCfg = cfg.copy().asProperties().asScala.map {
+ case (ConfigurationOptions.DORIS_REQUEST_AUTH_USER, v) =>
+ ("user", v)
+ case (ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, v) =>
+ ("password", v)
+ case (k, v) => (k, v)
+ }
+ data.write.format(DorisSourceProvider.SHORT_NAME)
+ .options(insertCfg)
+ .save()
+ }
}
diff --git
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 65f5250..3ac087d 100644
---
a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++
b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -19,6 +19,7 @@ package org.apache.doris.spark.sql
import org.apache.doris.spark.DorisStreamLoad
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -35,7 +36,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.util.control.Breaks
private[sql] class DorisSourceProvider extends DataSourceRegister with
RelationProvider with CreatableRelationProvider with StreamWriteSupport with
Logging {
- override def shortName(): String = "doris"
+ override def shortName(): String = SHORT_NAME
override def createRelation(sqlContext: SQLContext, parameters: Map[String,
String]): BaseRelation = {
new DorisRelation(sqlContext, Utils.params(parameters, log))
@@ -129,3 +130,7 @@ private[sql] class DorisSourceProvider extends
DataSourceRegister with RelationP
new DorisStreamWriter(sparkSettings)
}
}
+
+object DorisSourceProvider {
+ val SHORT_NAME: String = "doris"
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]