This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 999aeac [improve] dateframe sink support csv format (#76)
999aeac is described below
commit 999aeac06b64bdebc573325e77581d844e17c23d
Author: gnehil <[email protected]>
AuthorDate: Tue Mar 14 09:58:51 2023 +0800
[improve] dateframe sink support csv format (#76)
---
.../org/apache/doris/spark/DorisStreamLoad.java | 26 ++++------
.../doris/spark/sql/DorisStreamLoadSink.scala | 60 ++++++++++------------
2 files changed, 38 insertions(+), 48 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 3ada398..bda1c47 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -96,6 +96,11 @@ public class DorisStreamLoad implements Serializable{
cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
+ fileType = this.streamLoadProp.get("format") == null ? "csv" :
this.streamLoadProp.get("format");
+ if (fileType.equals("csv")){
+ FIELD_DELIMITER = this.streamLoadProp.get("column_separator") ==
null ? "\t" : this.streamLoadProp.get("column_separator");
+ LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null
? "\n" : this.streamLoadProp.get("line_delimiter");
+ }
}
public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws
IOException, DorisException {
@@ -116,7 +121,7 @@ public class DorisStreamLoad implements Serializable{
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
fileType = this.streamLoadProp.get("format") == null ? "csv" :
this.streamLoadProp.get("format");
- if (fileType.equals("csv")){
+ if ("csv".equals(fileType)){
FIELD_DELIMITER = this.streamLoadProp.get("column_separator") ==
null ? "\t" : this.streamLoadProp.get("column_separator");
LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null
? "\n" : this.streamLoadProp.get("line_delimiter");
}
@@ -150,20 +155,13 @@ public class DorisStreamLoad implements Serializable{
conn.setDoInput(true);
if (streamLoadProp != null) {
streamLoadProp.forEach((k, v) -> {
- if (streamLoadProp.containsKey("format")) {
- return;
- }
- if (streamLoadProp.containsKey("strip_outer_array")) {
- return;
- }
- if (streamLoadProp.containsKey("read_json_by_line")) {
+ if ("read_json_by_line".equals(k)) {
return;
}
conn.addRequestProperty(k, v);
});
}
- if (fileType.equals("json")){
- conn.addRequestProperty("format", "json");
+ if (fileType.equals("json")) {
conn.addRequestProperty("strip_outer_array", "true");
}
return conn;
@@ -182,11 +180,9 @@ public class DorisStreamLoad implements Serializable{
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("status: ").append(status);
- sb.append(", resp msg: ").append(respMsg);
- sb.append(", resp content: ").append(respContent);
- return sb.toString();
+ return "status: " + status +
+ ", resp msg: " + respMsg +
+ ", resp content: " + respContent;
}
}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index a2e3ed1..fea58ac 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,15 +17,15 @@
package org.apache.doris.spark.sql
-import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
+
import java.io.IOException
-import org.apache.doris.spark.rest.RestService
+import java.util
import java.util.Objects
import scala.util.control.Breaks
@@ -45,39 +45,32 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
if (batchId <= latestBatchId) {
logger.info(s"Skipping already committed batch $batchId")
} else {
- write(data.queryExecution)
+ write(data.rdd)
latestBatchId = batchId
}
}
- def write(queryExecution: QueryExecution): Unit = {
- val schema = queryExecution.analyzed.output
- var resultRdd = queryExecution.toRdd
+ def write(rdd: RDD[Row]): Unit = {
+ var resultRdd = rdd
if (Objects.nonNull(sinkTaskPartitionSize)) {
resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
}
// write for each partition
- resultRdd.foreachPartition(iter => {
- val objectMapper = new ObjectMapper()
- val rowArray = objectMapper.createArrayNode()
- iter.foreach(row => {
- val rowNode = objectMapper.createObjectNode()
- for (i <- 0 until row.numFields) {
- val colName = schema(i).name
- val value = row.copy().getUTF8String(i)
- if (value == null) {
- rowNode.putNull(colName)
- } else {
- rowNode.put(colName, value.toString)
- }
+ resultRdd.foreachPartition(partition => {
+ val rowsBuffer: util.List[util.List[Object]] = new
util.ArrayList[util.List[Object]](maxRowCount)
+ partition.foreach(row => {
+ val line: util.List[Object] = new util.ArrayList[Object]()
+ for (i <- 0 until row.size) {
+ val field = row.get(i)
+ line.add(field.asInstanceOf[AnyRef])
}
- rowArray.add(rowNode)
- if (rowArray.size > maxRowCount - 1) {
+ rowsBuffer.add(line)
+ if (rowsBuffer.size > maxRowCount - 1) {
flush
}
})
// flush buffer
- if (!rowArray.isEmpty) {
+ if (!rowsBuffer.isEmpty) {
flush
}
@@ -85,14 +78,15 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
* flush data to Doris and do retry when flush error
*
*/
- def flush = {
+ def flush(): Unit = {
val loop = new Breaks
+ var err: Exception = null
loop.breakable {
- for (i <- 0 to maxRetryTimes) {
+ for (i <- 1 to maxRetryTimes) {
try {
- dorisStreamLoader.load(rowArray.toString)
- rowArray.removeAll()
+ dorisStreamLoader.loadV2(rowsBuffer)
+ rowsBuffer.clear()
Thread.sleep(batchInterValMs.longValue())
loop.break()
}
@@ -100,21 +94,21 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
case e: Exception =>
try {
logger.debug("Failed to load data on BE: {} node ",
dorisStreamLoader.getLoadUrlStr)
+ if (err == null) err = e
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
- logger.warn("Data that failed to load : " +
rowArray.toString)
Thread.currentThread.interrupt()
- throw new IOException("unable to flush; interrupted while
doing another attempt", e)
+ throw new IOException("unable to flush; interrupted while
doing another attempt", ex)
}
}
}
- if (!rowArray.isEmpty) {
- logger.warn("Data that failed to load : " + rowArray.toString)
- throw new IOException(s"Failed to load data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
+ if (!rowsBuffer.isEmpty) {
+ throw new IOException(s"Failed to load ${maxRowCount} batch data
on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max
${maxRetryTimes} retry times.", err)
}
}
+
}
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]