This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 6f50c34 [Feature](load) Support loader with copy into (#190)
6f50c34 is described below
commit 6f50c3418d1226380ea39750d232b63a83ac6465
Author: Hong Liu <[email protected]>
AuthorDate: Mon Feb 19 17:53:55 2024 +0800
[Feature](load) Support loader with copy into (#190)
---
.../doris/spark/cfg/ConfigurationOptions.java | 3 +
.../doris/spark/exception/CopyIntoException.java | 38 +++
.../doris/spark/rest/models/RespContent.java | 6 +
.../apache/doris/spark/util/CopySQLBuilder.java | 82 ++++++
.../apache/doris/spark/util/HttpPostBuilder.java | 67 +++++
.../apache/doris/spark/util/HttpPutBuilder.java | 81 ++++++
.../apache/doris/spark/load/CopyIntoLoader.scala | 280 +++++++++++++++++++++
.../org/apache/doris/spark/load/StreamLoader.scala | 2 +-
.../apache/doris/spark/writer/DorisWriter.scala | 11 +-
9 files changed, 565 insertions(+), 5 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 437eabe..6ac7467 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -144,4 +144,7 @@ public interface ConfigurationOptions {
String DORIS_HTTPS_KEY_STORE_PASSWORD = "doris.https.key-store-password";
+ String LOAD_MODE = "doris.sink.load.mode";
+ String DEFAULT_LOAD_MODE = "stream_load";
+
}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java
new file mode 100644
index 0000000..7d40a1c
--- /dev/null
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.exception;
+
+public class CopyIntoException extends Exception {
+ public CopyIntoException() {
+ super();
+ }
+ public CopyIntoException(String message) {
+ super(message);
+ }
+ public CopyIntoException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ public CopyIntoException(Throwable cause) {
+ super(cause);
+ }
+ protected CopyIntoException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
index 60e3949..5f8d6ee 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
@@ -33,6 +33,9 @@ public class RespContent {
@JsonProperty(value = "TxnId")
private long TxnId;
+ @JsonProperty(value = "msg")
+ private String msg;
+
@JsonProperty(value = "Label")
private String Label;
@@ -108,4 +111,7 @@ public class RespContent {
return DORIS_SUCCESS_STATUS.contains(getStatus());
}
+ public boolean isCopyIntoSuccess(){
+ return this.msg.equals("success");
+ }
}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java
new file mode 100644
index 0000000..70dddd5
--- /dev/null
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringJoiner;
+import java.util.List;
+import java.util.Arrays;
+
+public class CopySQLBuilder {
+ private final static String COPY_SYNC = "copy.async";
+ private final static String FILE_TYPE = "file.type";
+ private final static String FORMAT_KEY = "format";
+ private final static String FIELD_DELIMITER_KEY = "column_separator";
+ private final static String LINE_DELIMITER_KEY = "line_delimiter";
+ private final static String COMPRESSION = "compression";
+
+ private final String fileName;
+ private Properties copyIntoProps;
+ private String tableIdentifier;
+ private String data_type;
+
+ public CopySQLBuilder(String data_type, Properties copyIntoProps, String
tableIdentifier, String fileName) {
+ this.data_type = data_type;
+ this.fileName = fileName;
+ this.tableIdentifier = tableIdentifier;
+ this.copyIntoProps = copyIntoProps;
+ }
+
+ public String buildCopySQL() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("COPY INTO ").append(tableIdentifier).append(" FROM
@~('{").append(String.format(fileName,"}*')")).append(" PROPERTIES (");
+
+ //copy into must be sync
+ copyIntoProps.put(COPY_SYNC, false);
+ copyIntoProps.put(FILE_TYPE, data_type);
+ if (data_type.equals("JSON")) {
+ copyIntoProps.put("file.strip_outer_array", "false");
+ }
+ StringJoiner props = new StringJoiner(",");
+ for (Map.Entry<Object, Object> entry : copyIntoProps.entrySet()) {
+ // remove format
+ if (!String.valueOf(entry.getKey()).equals("format")){
+ String key = concatPropPrefix(String.valueOf(entry.getKey()));
+ String value = String.valueOf(entry.getValue());
+ String prop = String.format("'%s'='%s'", key, value);
+ props.add(prop);
+ }
+ }
+ sb.append(props).append(" )");
+ return sb.toString();
+ }
+
+ static final List<String> PREFIX_LIST =
+ Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY,
COMPRESSION);
+
+ private String concatPropPrefix(String key) {
+ if (PREFIX_LIST.contains(key)) {
+ return "file." + key;
+ }
+ if (FORMAT_KEY.equals(key)) {
+ return "file.type";
+ }
+ return key;
+ }
+}
\ No newline at end of file
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java
new file mode 100644
index 0000000..d88d04b
--- /dev/null
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPost;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HttpPostBuilder {
+ String url;
+ Map<String, String> header;
+ HttpEntity httpEntity;
+
+ public HttpPostBuilder() {
+ header = new HashMap<>();
+ }
+
+ public HttpPostBuilder setUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public HttpPostBuilder addCommonHeader() {
+ header.put(HttpHeaders.EXPECT, "100-continue");
+ return this;
+ }
+
+ public HttpPostBuilder baseAuth(String encoded) {
+ header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded);
+ return this;
+ }
+
+ public HttpPostBuilder setEntity(HttpEntity httpEntity) {
+ this.httpEntity = httpEntity;
+ return this;
+ }
+
+ public HttpPost build() {
+ Preconditions.checkNotNull(url);
+ Preconditions.checkNotNull(httpEntity);
+ HttpPost put = new HttpPost(url);
+ header.forEach(put::setHeader);
+ put.setEntity(httpEntity);
+ return put;
+ }
+}
\ No newline at end of file
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java
new file mode 100644
index 0000000..e46b099
--- /dev/null
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HttpPutBuilder {
+ String url;
+ Map<String, String> header;
+ HttpEntity httpEntity;
+ public HttpPutBuilder() {
+ header = new HashMap<>();
+ }
+
+ public HttpPutBuilder setUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public HttpPutBuilder addFileName(String fileName){
+ header.put("fileName", fileName);
+ return this;
+ }
+
+ public HttpPutBuilder setEmptyEntity() {
+ try {
+ this.httpEntity = new StringEntity("");
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ return this;
+ }
+
+ public HttpPutBuilder addCommonHeader() {
+ header.put(HttpHeaders.EXPECT, "100-continue");
+ return this;
+ }
+
+ public HttpPutBuilder baseAuth(String encoded) {
+ header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded);
+ return this;
+ }
+
+ public HttpPutBuilder setEntity(HttpEntity httpEntity) {
+ this.httpEntity = httpEntity;
+ return this;
+ }
+
+ public HttpPut build() {
+ Preconditions.checkNotNull(url);
+ Preconditions.checkNotNull(httpEntity);
+ HttpPut put = new HttpPut(url);
+ header.forEach(put::setHeader);
+ put.setEntity(httpEntity);
+ return put;
+ }
+}
\ No newline at end of file
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala
new file mode 100644
index 0000000..a42e111
--- /dev/null
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.exception.{CopyIntoException,
StreamLoadException}
+import org.apache.doris.spark.rest.models.RespContent
+import org.apache.doris.spark.util.{CopySQLBuilder, HttpPostBuilder,
HttpPutBuilder}
+import org.apache.hadoop.util.StringUtils.escapeString
+import org.apache.http.{HttpEntity, HttpStatus}
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.entity.{BufferedHttpEntity, ByteArrayEntity,
InputStreamEntity, StringEntity}
+import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
+import org.apache.http.util.EntityUtils
+
+import scala.collection.JavaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.slf4j.{Logger, LoggerFactory}
+
+import java.io.{ByteArrayOutputStream, IOException}
+import java.nio.charset.StandardCharsets
+import java.util.zip.GZIPOutputStream
+import java.util.{Base64, Properties, UUID}
+import scala.util.{Failure, Success, Try}
+
+case class CopyIntoResponse(code: Int, msg: String, content: String)
+
+class CopyIntoLoader(settings: SparkSettings, isStreaming: Boolean) extends
Loader {
+
+ private final val LOG: Logger =
LoggerFactory.getLogger(classOf[CopyIntoLoader])
+
+ private val hostPort:String =
settings.getProperty(ConfigurationOptions.DORIS_FENODES)
+
+ private val tableIdentifier: String =
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
+
+ private val OBJECT_MAPPER = new ObjectMapper
+
+ private val copyIntoProps: Properties = getCopyIntoProps
+
+ private val format: DataFormat =
DataFormat.valueOf(copyIntoProps.getOrDefault("format",
"csv").toString.toUpperCase)
+
+ private val LOAD_URL_PATTERN = "http://%s/copy/upload"
+
+ private val COMMIT_PATTERN = "http://%s/copy/query"
+
+ private val authEncoded: String = getAuthEncoded
+
+
+ /**
+ * execute load
+ *
+ * @param iterator row data iterator
+ * @param schema row data schema
+ * @return commit message
+ */
+ override def load(iterator: Iterator[InternalRow], schema: StructType):
Option[CommitMessage] = {
+
+ var msg: Option[CommitMessage] = None
+
+ val fileName: String = UUID.randomUUID().toString
+ val client: CloseableHttpClient = getHttpClient
+ val currentLoadUrl: String = String.format(LOAD_URL_PATTERN, hostPort)
+
+ Try {
+ val uploadAddressRequest =
buildUploadAddressRequest(currentLoadUrl,fileName)
+ val uploadAddressReponse = client.execute(uploadAddressRequest.build())
+ val uploadAddress = handleGetUploadAddressResponse(uploadAddressReponse)
+ val uploadFileRequest = buildUpLoadFileRequest(uploadAddress, iterator,
schema)
+ val uploadFileReponse = client.execute(uploadFileRequest.build())
+ handleUploadFileResponse(uploadFileReponse)
+ val copyIntoRequest = executeCopyInto(s"${fileName}%s")
+ val copyIntoReponse = client.execute(copyIntoRequest.build())
+ handleExecuteCopyintoResponse(copyIntoReponse)
+ msg = Some(CommitMessage(fileName))
+ } match {
+ case Success(_) => client.close()
+ case Failure(e) =>
+ LOG.error(s"Copy into failed, err: ${ExceptionUtils.getStackTrace(e)}")
+ if (e.isInstanceOf[CopyIntoException]) throw e
+ throw new CopyIntoException(s"failed to load data on $currentLoadUrl",
e)
+ }
+ msg
+ }
+
+ /**
+ * handle execute copy into response
+ *
+ * @param copyIntoReponse row data iterator
+ */
+ private def handleExecuteCopyintoResponse(copyIntoReponse:
CloseableHttpResponse) = {
+ val code = copyIntoReponse.getStatusLine.getStatusCode
+ val msg = copyIntoReponse.getStatusLine.getReasonPhrase
+ val content = EntityUtils.toString(new
BufferedHttpEntity(copyIntoReponse.getEntity), StandardCharsets.UTF_8)
+ val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content)
+ if (loadResponse.code != HttpStatus.SC_OK) {
+ LOG.error(s"Execute copy sql status is not OK, status:
${loadResponse.code}, response: $loadResponse")
+ throw new StreamLoadException(String.format("Execute copy sql, http
status:%d, response:%s",
+ new Integer(loadResponse.code), loadResponse))
+ } else {
+ try {
+ val respContent = OBJECT_MAPPER.readValue(loadResponse.content,
classOf[RespContent])
+ if (!respContent.isCopyIntoSuccess) {
+ LOG.error(s"Execute copy sql status is not success,
status:${respContent.getStatus}, response:$loadResponse")
+ throw new StreamLoadException(String.format("Execute copy sql error,
load status:%s, response:%s", respContent.getStatus, loadResponse))
+ }
+ LOG.info("Execute copy sql Response:{}", loadResponse)
+ } catch {
+ case e: IOException =>
+ throw new StreamLoadException(e)
+ }
+ }
+ }
+
+ private def handleUploadFileResponse(uploadFileReponse:
CloseableHttpResponse) = {
+ val code = uploadFileReponse.getStatusLine.getStatusCode
+ val msg = uploadFileReponse.getStatusLine.getReasonPhrase
+ val content = EntityUtils.toString(new
BufferedHttpEntity(uploadFileReponse.getEntity), StandardCharsets.UTF_8)
+ val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content)
+ if (loadResponse.code != HttpStatus.SC_OK) {
+ LOG.error(s"Upload file status is not OK, status: ${loadResponse.code},
response: $loadResponse")
+ throw new CopyIntoException(s"Upload file error, http
status:${loadResponse.code}, response:$loadResponse")
+ } else {
+ LOG.info(s"Upload file success,status: ${loadResponse.code}, response:
$loadResponse")
+ }
+ }
+
+ private def buildUpLoadFileRequest(uploadAddress: String, iterator:
Iterator[InternalRow], schema: StructType): HttpPutBuilder = {
+ val builder = new
HttpPutBuilder().setUrl(uploadAddress).addCommonHeader().setEntity(generateHttpEntity(iterator,
schema))
+ builder
+ }
+
+ /**
+ * commit transaction
+ *
+ * @param msg commit message
+ */
+ override def commit(msg: CommitMessage): Unit = ???
+
+
+ /**
+ * abort transaction
+ *
+ * @param msg commit message
+ */
+ override def abort(msg: CommitMessage): Unit = ???
+
+ private def executeCopyInto(fileName: String): HttpPostBuilder = {
+
+ val copySQLBuilder: CopySQLBuilder = new CopySQLBuilder(format.toString,
copyIntoProps, tableIdentifier, fileName)
+ val copySql: String = copySQLBuilder.buildCopySQL()
+ LOG.info(s"build copy sql is $copySql")
+ val objectNode: ObjectNode = OBJECT_MAPPER.createObjectNode()
+ objectNode.put("sql", copySql)
+
+ val postBuilder: HttpPostBuilder = new HttpPostBuilder()
+ postBuilder.setUrl(String.format(COMMIT_PATTERN,
hostPort)).baseAuth(authEncoded)
+ .setEntity(new
StringEntity(OBJECT_MAPPER.writeValueAsString(objectNode)))
+ }
+
+ private def buildUploadAddressRequest(url: String, fileName: String):
HttpPutBuilder = {
+ val builder: HttpPutBuilder = new
HttpPutBuilder().setUrl(url).addCommonHeader().addFileName(fileName).setEntity(new
StringEntity("")).baseAuth(authEncoded)
+ builder
+ }
+
+ private def getAuthEncoded: String = {
+ val user =
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER)
+ val passwd =
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD)
+
Base64.getEncoder.encodeToString(s"$user:$passwd".getBytes(StandardCharsets.UTF_8))
+ }
+
+ private def generateHttpEntity(iterator: Iterator[InternalRow], schema:
StructType): HttpEntity = {
+
+ var entity: Option[HttpEntity] = None
+
+ val compressType = copyIntoProps.getProperty("compression")
+ val columnSeparator =
escapeString(copyIntoProps.getOrDefault("file.column_separator", "\t").toString)
+ val lineDelimiter =
escapeString(copyIntoProps.getOrDefault("file.line_delimiter", "\n").toString)
+ val addDoubleQuotes = copyIntoProps.getOrDefault("add_double_quotes",
"false").toString.toBoolean
+ val streamingPassthrough: Boolean = isStreaming &&
settings.getBooleanProperty(
+ ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH,
+ ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT)
+
+ val recordBatchString = new
RecordBatchString(RecordBatch.newBuilder(iterator.asJava)
+ .format(format)
+ .sep(columnSeparator)
+ .delim(lineDelimiter)
+ .schema(schema)
+ .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough)
+ val content = recordBatchString.getContent
+
+ if (compressType !=null && !compressType.isEmpty) {
+ if ("gz".equalsIgnoreCase(compressType) && format == DataFormat.CSV) {
+ val compressedData = compressByGZ(content)
+ entity = Some(new ByteArrayEntity(compressedData))
+ }
+ else {
+ val msg = s"Not support the compress type [$compressType] for the
format [$format]"
+ throw new CopyIntoException(msg)
+ }
+ }
+ else {
+ entity = Some(new
ByteArrayEntity(content.getBytes(StandardCharsets.UTF_8)))
+ }
+
+ entity.get
+
+ }
+
+ private def getCopyIntoProps: Properties = {
+ val props =
settings.asProperties().asScala.filter(_._1.startsWith(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX))
+ .map { case (k,v) =>
(k.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length), v)}
+ if (props.getOrElse("add_double_quotes", "false").toBoolean) {
+ LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to
true for prop.")
+ props.put("trim_double_quotes", "true")
+ }
+ props.remove("columns")
+ val properties = new Properties()
+ properties.putAll(props.mapValues(_.toString).asJava)
+ properties
+ }
+
+
+ @throws[IOException]
+ def compressByGZ(content: String): Array[Byte] = {
+ var compressedData: Array[Byte] = null
+ try {
+ val baos = new ByteArrayOutputStream
+ val gzipOutputStream = new GZIPOutputStream(baos)
+ try {
+ gzipOutputStream.write(content.getBytes("UTF-8"))
+ gzipOutputStream.finish()
+ compressedData = baos.toByteArray
+ } finally {
+ if (baos != null) baos.close()
+ if (gzipOutputStream != null) gzipOutputStream.close()
+ }
+ }
+ compressedData
+ }
+
+ private def getHttpClient: CloseableHttpClient = {
+ HttpClients.custom().disableRedirectHandling().build()
+ }
+
+ @throws[CopyIntoException]
+ private def handleGetUploadAddressResponse(response: CloseableHttpResponse):
String = {
+ val code = response.getStatusLine.getStatusCode
+ val msg = response.getStatusLine.getReasonPhrase
+ val content = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity), StandardCharsets.UTF_8)
+ val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content)
+ if (loadResponse.code == 307) {
+ val uploadAddress:String = response.getFirstHeader("location").getValue
+ LOG.info(s"Get upload address Response:$loadResponse")
+ LOG.info(s"Redirect to s3: $uploadAddress")
+ uploadAddress
+ } else {
+ LOG.error(s"Failed get the redirected address, status
${loadResponse.code}, reason ${loadResponse.msg}, response
${loadResponse.content}")
+ throw new RuntimeException("Could not get the redirected address.")
+ }
+ }
+}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 57cacd6..25e86aa 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -365,7 +365,7 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
val compressType = streamLoadProps.get("compress_type")
val columnSeparator =
escapeString(streamLoadProps.getOrElse("column_separator", "\t"))
- val lineDelimiter =
escapeString(streamLoadProps.getOrElse("line_delimiter", "\t"))
+ val lineDelimiter =
escapeString(streamLoadProps.getOrElse("line_delimiter", "\n"))
val addDoubleQuotes = streamLoadProps.getOrElse("add_double_quotes",
"false").toBoolean
val streamingPassthrough: Boolean = isStreaming &&
settings.getBooleanProperty(
ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH,
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 26491df..bd8e9f7 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -18,7 +18,7 @@
package org.apache.doris.spark.writer
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.load.{CommitMessage, Loader, StreamLoader}
+import org.apache.doris.spark.load.{CommitMessage, CopyIntoLoader, Loader,
StreamLoader}
import org.apache.doris.spark.sql.Utils
import org.apache.doris.spark.txn.TransactionHandler
import org.apache.doris.spark.txn.listener.DorisTransactionListener
@@ -42,6 +42,7 @@ class DorisWriter(settings: SparkSettings,
private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])
private val sinkTaskPartitionSize: Integer =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+ private val loadMode: String =
settings.getProperty(ConfigurationOptions.LOAD_MODE,ConfigurationOptions.DEFAULT_LOAD_MODE)
private val sinkTaskUseRepartition: Boolean =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
@@ -210,9 +211,11 @@ class DorisWriter(settings: SparkSettings,
@throws[IllegalArgumentException]
private def generateLoader: Loader = {
- val loadMode = settings.getProperty("load_mode", "stream_load")
- if ("stream_load".equalsIgnoreCase(loadMode)) new StreamLoader(settings,
isStreaming)
- else throw new IllegalArgumentException(s"Unsupported load mode:
$loadMode")
+ loadMode match {
+ case "stream_load" => new StreamLoader(settings, isStreaming)
+ case "copy_into" => new CopyIntoLoader(settings, isStreaming)
+ case _ => throw new IllegalArgumentException(s"Unsupported load mode:
$loadMode")
+ }
}
def getTransactionHandler: TransactionHandler = txnHandler
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]