This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3e745e7 [Improve] decrease memory usage when csv&gzip is on (#212)
3e745e7 is described below
commit 3e745e732fdade8a26856bd92026b44fd02d2787
Author: zhaorongsheng <[email protected]>
AuthorDate: Mon Jul 1 10:20:40 2024 +0800
[Improve] decrease memory usage when csv&gzip is on (#212)
Co-authored-by: zhaorongsheng <[email protected]>
---
.../org/apache/doris/spark/load/StreamLoader.scala | 33 +++++++++++++++++++---
1 file changed, 29 insertions(+), 4 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 9481b6f..06bb56f 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -20,6 +20,7 @@ package org.apache.doris.spark.load
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.json.JsonMapper
+import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
@@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.slf4j.{Logger, LoggerFactory}
-import java.io.{ByteArrayOutputStream, IOException}
+import java.io.{ByteArrayOutputStream, IOException, InputStream}
import java.net.{HttpURLConnection, URL}
import java.nio.charset.StandardCharsets
import java.util
@@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
if (compressType.nonEmpty) {
if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV)
{
- val recordBatchString = new
RecordBatchString(RecordBatch.newBuilder(iterator.asJava)
+ val recodeBatchInputStream = new
RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava)
.format(format)
.sep(columnSeparator)
.delim(lineDelimiter)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough)
- val content = recordBatchString.getContent
- val compressedData = compressByGZ(content)
+ val compressedData = compressByGZ(recodeBatchInputStream)
entity = Some(new ByteArrayEntity(compressedData))
}
else {
@@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
compressedData
}
+ /**
+ * compress data by gzip
+ *
+ * @param contentInputStream data content
+ * @throws
+ * @return compressed byte array data
+ */
+ @throws[IOException]
+ def compressByGZ(contentInputStream: InputStream): Array[Byte] = {
+ var compressedData: Array[Byte] = null
+ try {
+ val baos = new ByteArrayOutputStream
+ val gzipOutputStream = new GZIPOutputStream(baos)
+ try {
+ IOUtils.copy(contentInputStream, gzipOutputStream)
+ gzipOutputStream.finish()
+ compressedData = baos.toByteArray
+ } finally {
+ if (baos != null) baos.close()
+ if (gzipOutputStream != null) gzipOutputStream.close()
+ }
+ }
+ compressedData
+ }
+
/**
* handle stream load response
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]