This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e51e7b6 [bug] fix stream dataframe writing to doris json parse
exception (#48)
e51e7b6 is described below
commit e51e7b618acce07e98893134e3084c81d3286bc6
Author: wei zhao <[email protected]>
AuthorDate: Tue Sep 6 16:36:57 2022 +0800
[bug] fix stream dataframe writing to doris json parse exception (#48)
fix stream dataframe writing to doris json parse exception
---
.../doris/spark/sql/DorisStreamLoadSink.scala | 31 +++++++++++++---------
1 file changed, 19 insertions(+), 12 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 2daaeb1..566eb3b 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
-import java.util
import org.apache.doris.spark.rest.RestService
import scala.util.control.Breaks
@@ -48,21 +47,29 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
}
def write(queryExecution: QueryExecution): Unit = {
+ val schema = queryExecution.analyzed.output
+ // write for each partition
queryExecution.toRdd.foreachPartition(iter => {
val objectMapper = new ObjectMapper()
- val arrayNode = objectMapper.createArrayNode()
+ val rowArray = objectMapper.createArrayNode()
iter.foreach(row => {
- val line: util.List[Object] = new util.ArrayList[Object](maxRowCount)
+ val rowNode = objectMapper.createObjectNode()
for (i <- 0 until row.numFields) {
- val field = row.copy().getUTF8String(i)
- arrayNode.add(objectMapper.readTree(field.toString))
+ val colName = schema(i).name
+ val value = row.copy().getUTF8String(i)
+ if (value == null) {
+ rowNode.putNull(colName)
+ } else {
+ rowNode.put(colName, value.toString)
+ }
}
- if (arrayNode.size > maxRowCount - 1) {
+ rowArray.add(rowNode)
+ if (rowArray.size > maxRowCount - 1) {
flush
}
})
// flush buffer
- if (!arrayNode.isEmpty) {
+ if (!rowArray.isEmpty) {
flush
}
@@ -76,8 +83,8 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
for (i <- 0 to maxRetryTimes) {
try {
- dorisStreamLoader.load(arrayNode.toString)
- arrayNode.removeAll()
+ dorisStreamLoader.load(rowArray.toString)
+ rowArray.removeAll()
loop.break()
}
catch {
@@ -89,15 +96,15 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
- logger.warn("Data that failed to load : " +
arrayNode.toString)
+ logger.warn("Data that failed to load : " +
rowArray.toString)
Thread.currentThread.interrupt()
throw new IOException("unable to flush; interrupted while
doing another attempt", e)
}
}
}
- if (!arrayNode.isEmpty) {
- logger.warn("Data that failed to load : " + arrayNode.toString)
+ if (!rowArray.isEmpty) {
+ logger.warn("Data that failed to load : " + rowArray.toString)
throw new IOException(s"Failed to load data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]