This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f50c34  [Feature](load) Support loader with copy into (#190)
6f50c34 is described below

commit 6f50c3418d1226380ea39750d232b63a83ac6465
Author: Hong Liu <[email protected]>
AuthorDate: Mon Feb 19 17:53:55 2024 +0800

    [Feature](load) Support loader with copy into (#190)
---
 .../doris/spark/cfg/ConfigurationOptions.java      |   3 +
 .../doris/spark/exception/CopyIntoException.java   |  38 +++
 .../doris/spark/rest/models/RespContent.java       |   6 +
 .../apache/doris/spark/util/CopySQLBuilder.java    |  82 ++++++
 .../apache/doris/spark/util/HttpPostBuilder.java   |  67 +++++
 .../apache/doris/spark/util/HttpPutBuilder.java    |  81 ++++++
 .../apache/doris/spark/load/CopyIntoLoader.scala   | 280 +++++++++++++++++++++
 .../org/apache/doris/spark/load/StreamLoader.scala |   2 +-
 .../apache/doris/spark/writer/DorisWriter.scala    |  11 +-
 9 files changed, 565 insertions(+), 5 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 437eabe..6ac7467 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -144,4 +144,7 @@ public interface ConfigurationOptions {
 
     String DORIS_HTTPS_KEY_STORE_PASSWORD = "doris.https.key-store-password";
 
+    String LOAD_MODE = "doris.sink.load.mode";
+    String DEFAULT_LOAD_MODE = "stream_load";
+
 }
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java
new file mode 100644
index 0000000..7d40a1c
--- /dev/null
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/CopyIntoException.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.exception;
+
+public class CopyIntoException extends Exception {
+    public CopyIntoException() {
+        super();
+    }
+    public CopyIntoException(String message) {
+        super(message);
+    }
+    public CopyIntoException(String message, Throwable cause) {
+        super(message, cause);
+    }
+    public CopyIntoException(Throwable cause) {
+        super(cause);
+    }
+    protected CopyIntoException(String message, Throwable cause,
+                                boolean enableSuppression,
+                                boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
index 60e3949..5f8d6ee 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
@@ -33,6 +33,9 @@ public class RespContent {
     @JsonProperty(value = "TxnId")
     private long TxnId;
 
+    @JsonProperty(value = "msg")
+    private String msg;
+
     @JsonProperty(value = "Label")
     private String Label;
 
@@ -108,4 +111,7 @@ public class RespContent {
         return DORIS_SUCCESS_STATUS.contains(getStatus());
     }
 
+    public boolean isCopyIntoSuccess(){
+        return this.msg.equals("success");
+    }
 }
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java
new file mode 100644
index 0000000..70dddd5
--- /dev/null
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/CopySQLBuilder.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringJoiner;
+import java.util.List;
+import java.util.Arrays;
+
+public class CopySQLBuilder {
+    private final static String COPY_SYNC = "copy.async";
+    private final static String FILE_TYPE = "file.type";
+    private final static String FORMAT_KEY = "format";
+    private final static String FIELD_DELIMITER_KEY = "column_separator";
+    private final static String LINE_DELIMITER_KEY = "line_delimiter";
+    private final static String COMPRESSION = "compression";
+
+    private final String fileName;
+    private Properties copyIntoProps;
+    private String tableIdentifier;
+    private String data_type;
+
+    public CopySQLBuilder(String data_type, Properties copyIntoProps, String 
tableIdentifier, String fileName) {
+        this.data_type = data_type;
+        this.fileName = fileName;
+        this.tableIdentifier = tableIdentifier;
+        this.copyIntoProps = copyIntoProps;
+    }
+
+    public String buildCopySQL() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("COPY INTO ").append(tableIdentifier).append(" FROM 
@~('{").append(String.format(fileName,"}*')")).append(" PROPERTIES (");
+
+        //copy into must be sync
+        copyIntoProps.put(COPY_SYNC, false);
+        copyIntoProps.put(FILE_TYPE, data_type);
+        if (data_type.equals("JSON")) {
+            copyIntoProps.put("file.strip_outer_array", "false");
+        }
+        StringJoiner props = new StringJoiner(",");
+        for (Map.Entry<Object, Object> entry : copyIntoProps.entrySet()) {
+            // remove format
+            if (!String.valueOf(entry.getKey()).equals("format")){
+                String key = concatPropPrefix(String.valueOf(entry.getKey()));
+                String value = String.valueOf(entry.getValue());
+                String prop = String.format("'%s'='%s'", key, value);
+                props.add(prop);
+            }
+        }
+        sb.append(props).append(" )");
+        return sb.toString();
+    }
+
+    static final List<String> PREFIX_LIST =
+            Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, 
COMPRESSION);
+
+    private String concatPropPrefix(String key) {
+        if (PREFIX_LIST.contains(key)) {
+            return "file." + key;
+        }
+        if (FORMAT_KEY.equals(key)) {
+            return "file.type";
+        }
+        return key;
+    }
+}
\ No newline at end of file
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java
new file mode 100644
index 0000000..d88d04b
--- /dev/null
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPostBuilder.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPost;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HttpPostBuilder {
+    String url;
+    Map<String, String> header;
+    HttpEntity httpEntity;
+
+    public HttpPostBuilder() {
+        header = new HashMap<>();
+    }
+
+    public HttpPostBuilder setUrl(String url) {
+        this.url = url;
+        return this;
+    }
+
+    public HttpPostBuilder addCommonHeader() {
+        header.put(HttpHeaders.EXPECT, "100-continue");
+        return this;
+    }
+
+    public HttpPostBuilder baseAuth(String encoded) {
+        header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded);
+        return this;
+    }
+
+    public HttpPostBuilder setEntity(HttpEntity httpEntity) {
+        this.httpEntity = httpEntity;
+        return this;
+    }
+
+    public HttpPost build() {
+        Preconditions.checkNotNull(url);
+        Preconditions.checkNotNull(httpEntity);
+        HttpPost put = new HttpPost(url);
+        header.forEach(put::setHeader);
+        put.setEntity(httpEntity);
+        return put;
+    }
+}
\ No newline at end of file
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java
new file mode 100644
index 0000000..e46b099
--- /dev/null
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/HttpPutBuilder.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HttpPutBuilder {
+    String url;
+    Map<String, String> header;
+    HttpEntity httpEntity;
+    public HttpPutBuilder() {
+        header = new HashMap<>();
+    }
+
+    public HttpPutBuilder setUrl(String url) {
+        this.url = url;
+        return this;
+    }
+
+    public HttpPutBuilder addFileName(String fileName){
+        header.put("fileName", fileName);
+        return this;
+    }
+
+    public HttpPutBuilder setEmptyEntity() {
+        try {
+            this.httpEntity = new StringEntity("");
+        } catch (Exception e) {
+            throw new IllegalArgumentException(e);
+        }
+        return this;
+    }
+
+    public HttpPutBuilder addCommonHeader() {
+        header.put(HttpHeaders.EXPECT, "100-continue");
+        return this;
+    }
+
+    public HttpPutBuilder baseAuth(String encoded) {
+        header.put(HttpHeaders.AUTHORIZATION, "Basic " + encoded);
+        return this;
+    }
+
+    public HttpPutBuilder setEntity(HttpEntity httpEntity) {
+        this.httpEntity = httpEntity;
+        return this;
+    }
+
+    public HttpPut build() {
+        Preconditions.checkNotNull(url);
+        Preconditions.checkNotNull(httpEntity);
+        HttpPut put = new HttpPut(url);
+        header.forEach(put::setHeader);
+        put.setEntity(httpEntity);
+        return put;
+    }
+}
\ No newline at end of file
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala
new file mode 100644
index 0000000..a42e111
--- /dev/null
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/CopyIntoLoader.scala
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.load
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.exception.{CopyIntoException, 
StreamLoadException}
+import org.apache.doris.spark.rest.models.RespContent
+import org.apache.doris.spark.util.{CopySQLBuilder, HttpPostBuilder, 
HttpPutBuilder}
+import org.apache.hadoop.util.StringUtils.escapeString
+import org.apache.http.{HttpEntity, HttpStatus}
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.entity.{BufferedHttpEntity, ByteArrayEntity, 
InputStreamEntity, StringEntity}
+import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
+import org.apache.http.util.EntityUtils
+
+import scala.collection.JavaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.slf4j.{Logger, LoggerFactory}
+
+import java.io.{ByteArrayOutputStream, IOException}
+import java.nio.charset.StandardCharsets
+import java.util.zip.GZIPOutputStream
+import java.util.{Base64, Properties, UUID}
+import scala.util.{Failure, Success, Try}
+
+case class CopyIntoResponse(code: Int, msg: String, content: String)
+
+class CopyIntoLoader(settings: SparkSettings, isStreaming: Boolean) extends 
Loader {
+
+  private final val LOG: Logger = 
LoggerFactory.getLogger(classOf[CopyIntoLoader])
+
+  private val hostPort:String = 
settings.getProperty(ConfigurationOptions.DORIS_FENODES)
+
+  private val tableIdentifier: String = 
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER)
+
+  private val OBJECT_MAPPER = new ObjectMapper
+
+  private val copyIntoProps: Properties = getCopyIntoProps
+
+  private val format: DataFormat = 
DataFormat.valueOf(copyIntoProps.getOrDefault("format", 
"csv").toString.toUpperCase)
+
+  private val LOAD_URL_PATTERN = "http://%s/copy/upload";
+
+  private val COMMIT_PATTERN = "http://%s/copy/query";
+
+  private val authEncoded: String = getAuthEncoded
+
+
+  /**
+   * execute load
+   *
+   * @param iterator row data iterator
+   * @param schema   row data schema
+   * @return commit message
+   */
+  override def load(iterator: Iterator[InternalRow], schema: StructType): 
Option[CommitMessage] = {
+
+    var msg: Option[CommitMessage] = None
+
+    val fileName: String = UUID.randomUUID().toString
+    val client: CloseableHttpClient = getHttpClient
+    val currentLoadUrl: String = String.format(LOAD_URL_PATTERN, hostPort)
+
+    Try {
+      val uploadAddressRequest = 
buildUploadAddressRequest(currentLoadUrl,fileName)
+      val uploadAddressReponse = client.execute(uploadAddressRequest.build())
+      val uploadAddress = handleGetUploadAddressResponse(uploadAddressReponse)
+      val uploadFileRequest = buildUpLoadFileRequest(uploadAddress, iterator, 
schema)
+      val uploadFileReponse = client.execute(uploadFileRequest.build())
+      handleUploadFileResponse(uploadFileReponse)
+      val copyIntoRequest = executeCopyInto(s"${fileName}%s")
+      val copyIntoReponse = client.execute(copyIntoRequest.build())
+      handleExecuteCopyintoResponse(copyIntoReponse)
+      msg = Some(CommitMessage(fileName))
+    } match {
+      case Success(_) => client.close()
+      case Failure(e) =>
+        LOG.error(s"Copy into failed, err: ${ExceptionUtils.getStackTrace(e)}")
+        if (e.isInstanceOf[CopyIntoException]) throw e
+        throw new CopyIntoException(s"failed to load data on $currentLoadUrl", 
e)
+    }
+    msg
+  }
+
+  /**
+   * handle execute copy into response
+   *
+   * @param copyIntoReponse row data iterator
+   */
+  private def handleExecuteCopyintoResponse(copyIntoReponse: 
CloseableHttpResponse) = {
+    val code = copyIntoReponse.getStatusLine.getStatusCode
+    val msg = copyIntoReponse.getStatusLine.getReasonPhrase
+    val content = EntityUtils.toString(new 
BufferedHttpEntity(copyIntoReponse.getEntity), StandardCharsets.UTF_8)
+    val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content)
+    if (loadResponse.code != HttpStatus.SC_OK) {
+      LOG.error(s"Execute copy sql status is not OK, status: 
${loadResponse.code}, response: $loadResponse")
+      throw new StreamLoadException(String.format("Execute copy sql, http 
status:%d, response:%s",
+        new Integer(loadResponse.code), loadResponse))
+    } else {
+      try {
+        val respContent = OBJECT_MAPPER.readValue(loadResponse.content, 
classOf[RespContent])
+        if (!respContent.isCopyIntoSuccess) {
+          LOG.error(s"Execute copy sql status is not success, 
status:${respContent.getStatus}, response:$loadResponse")
+          throw new StreamLoadException(String.format("Execute copy sql error, 
load status:%s, response:%s", respContent.getStatus, loadResponse))
+        }
+        LOG.info("Execute copy sql Response:{}", loadResponse)
+      } catch {
+        case e: IOException =>
+          throw new StreamLoadException(e)
+      }
+    }
+  }
+
+  private def handleUploadFileResponse(uploadFileReponse: 
CloseableHttpResponse) = {
+    val code = uploadFileReponse.getStatusLine.getStatusCode
+    val msg = uploadFileReponse.getStatusLine.getReasonPhrase
+    val content = EntityUtils.toString(new 
BufferedHttpEntity(uploadFileReponse.getEntity), StandardCharsets.UTF_8)
+    val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content)
+    if (loadResponse.code != HttpStatus.SC_OK) {
+      LOG.error(s"Upload file status is not OK, status: ${loadResponse.code}, 
response: $loadResponse")
+      throw new CopyIntoException(s"Upload file error, http 
status:${loadResponse.code}, response:$loadResponse")
+    } else {
+      LOG.info(s"Upload file success,status: ${loadResponse.code}, response: 
$loadResponse")
+    }
+  }
+
+  private def buildUpLoadFileRequest(uploadAddress: String, iterator: 
Iterator[InternalRow], schema: StructType): HttpPutBuilder = {
+    val builder = new 
HttpPutBuilder().setUrl(uploadAddress).addCommonHeader().setEntity(generateHttpEntity(iterator,
 schema))
+    builder
+  }
+
+  /**
+   * commit transaction
+   *
+   * @param msg commit message
+   */
+  override def commit(msg: CommitMessage): Unit = ???
+
+
+  /**
+   * abort transaction
+   *
+   * @param msg commit message
+   */
+  override def abort(msg: CommitMessage): Unit = ???
+
+  private def executeCopyInto(fileName: String): HttpPostBuilder = {
+
+    val copySQLBuilder: CopySQLBuilder = new CopySQLBuilder(format.toString, 
copyIntoProps, tableIdentifier, fileName)
+    val copySql: String = copySQLBuilder.buildCopySQL()
+    LOG.info(s"build copy sql is $copySql")
+    val objectNode: ObjectNode = OBJECT_MAPPER.createObjectNode()
+    objectNode.put("sql", copySql)
+
+    val postBuilder: HttpPostBuilder = new HttpPostBuilder()
+    postBuilder.setUrl(String.format(COMMIT_PATTERN, 
hostPort)).baseAuth(authEncoded)
+      .setEntity(new 
StringEntity(OBJECT_MAPPER.writeValueAsString(objectNode)))
+  }
+
+  private def buildUploadAddressRequest(url: String, fileName: String): 
HttpPutBuilder = {
+    val builder: HttpPutBuilder = new 
HttpPutBuilder().setUrl(url).addCommonHeader().addFileName(fileName).setEntity(new
 StringEntity("")).baseAuth(authEncoded)
+    builder
+  }
+
+  private def getAuthEncoded: String = {
+    val user = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER)
+    val passwd = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD)
+    
Base64.getEncoder.encodeToString(s"$user:$passwd".getBytes(StandardCharsets.UTF_8))
+  }
+
+  private def generateHttpEntity(iterator: Iterator[InternalRow], schema: 
StructType): HttpEntity = {
+
+    var entity: Option[HttpEntity] = None
+
+    val compressType = copyIntoProps.getProperty("compression")
+    val columnSeparator = 
escapeString(copyIntoProps.getOrDefault("file.column_separator", "\t").toString)
+    val lineDelimiter = 
escapeString(copyIntoProps.getOrDefault("file.line_delimiter", "\n").toString)
+    val addDoubleQuotes = copyIntoProps.getOrDefault("add_double_quotes", 
"false").toString.toBoolean
+    val streamingPassthrough: Boolean = isStreaming && 
settings.getBooleanProperty(
+      ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH,
+      ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT)
+
+    val recordBatchString = new 
RecordBatchString(RecordBatch.newBuilder(iterator.asJava)
+      .format(format)
+      .sep(columnSeparator)
+      .delim(lineDelimiter)
+      .schema(schema)
+      .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough)
+    val content = recordBatchString.getContent
+
+    if (compressType !=null && !compressType.isEmpty) {
+      if ("gz".equalsIgnoreCase(compressType) && format == DataFormat.CSV) {
+        val compressedData = compressByGZ(content)
+        entity = Some(new ByteArrayEntity(compressedData))
+      }
+      else {
+        val msg = s"Not support the compress type [$compressType] for the 
format [$format]"
+        throw new CopyIntoException(msg)
+      }
+    }
+    else {
+      entity = Some(new 
ByteArrayEntity(content.getBytes(StandardCharsets.UTF_8)))
+    }
+
+    entity.get
+
+  }
+
+  private def getCopyIntoProps: Properties = {
+    val props = 
settings.asProperties().asScala.filter(_._1.startsWith(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX))
+      .map { case (k,v) => 
(k.substring(ConfigurationOptions.STREAM_LOAD_PROP_PREFIX.length), v)}
+    if (props.getOrElse("add_double_quotes", "false").toBoolean) {
+      LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to 
true for prop.")
+      props.put("trim_double_quotes", "true")
+    }
+    props.remove("columns")
+    val properties = new Properties()
+    properties.putAll(props.mapValues(_.toString).asJava)
+    properties
+  }
+
+
+  @throws[IOException]
+  def compressByGZ(content: String): Array[Byte] = {
+    var compressedData: Array[Byte] = null
+    try {
+      val baos = new ByteArrayOutputStream
+      val gzipOutputStream = new GZIPOutputStream(baos)
+      try {
+        gzipOutputStream.write(content.getBytes("UTF-8"))
+        gzipOutputStream.finish()
+        compressedData = baos.toByteArray
+      } finally {
+        if (baos != null) baos.close()
+        if (gzipOutputStream != null) gzipOutputStream.close()
+      }
+    }
+    compressedData
+  }
+
+  private def getHttpClient: CloseableHttpClient = {
+    HttpClients.custom().disableRedirectHandling().build()
+  }
+
+  @throws[CopyIntoException]
+  private def handleGetUploadAddressResponse(response: CloseableHttpResponse): 
String = {
+    val code = response.getStatusLine.getStatusCode
+    val msg = response.getStatusLine.getReasonPhrase
+    val content = EntityUtils.toString(new 
BufferedHttpEntity(response.getEntity), StandardCharsets.UTF_8)
+    val loadResponse: CopyIntoResponse = CopyIntoResponse(code, msg, content)
+    if (loadResponse.code == 307) {
+      val uploadAddress:String = response.getFirstHeader("location").getValue
+      LOG.info(s"Get upload address Response:$loadResponse")
+      LOG.info(s"Redirect to s3: $uploadAddress")
+      uploadAddress
+    } else {
+      LOG.error(s"Failed get the redirected address, status 
${loadResponse.code}, reason ${loadResponse.msg}, response 
${loadResponse.content}")
+      throw new RuntimeException("Could not get the redirected address.")
+    }
+  }
+}
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 57cacd6..25e86aa 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -365,7 +365,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: 
Boolean) extends Loader
 
     val compressType = streamLoadProps.get("compress_type")
     val columnSeparator = 
escapeString(streamLoadProps.getOrElse("column_separator", "\t"))
-    val lineDelimiter = 
escapeString(streamLoadProps.getOrElse("line_delimiter", "\t"))
+    val lineDelimiter = 
escapeString(streamLoadProps.getOrElse("line_delimiter", "\n"))
     val addDoubleQuotes = streamLoadProps.getOrElse("add_double_quotes", 
"false").toBoolean
     val streamingPassthrough: Boolean = isStreaming && 
settings.getBooleanProperty(
       ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH,
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 26491df..bd8e9f7 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -18,7 +18,7 @@
 package org.apache.doris.spark.writer
 
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.load.{CommitMessage, Loader, StreamLoader}
+import org.apache.doris.spark.load.{CommitMessage, CopyIntoLoader, Loader, 
StreamLoader}
 import org.apache.doris.spark.sql.Utils
 import org.apache.doris.spark.txn.TransactionHandler
 import org.apache.doris.spark.txn.listener.DorisTransactionListener
@@ -42,6 +42,7 @@ class DorisWriter(settings: SparkSettings,
   private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])
 
   private val sinkTaskPartitionSize: Integer = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+  private val loadMode: String = 
settings.getProperty(ConfigurationOptions.LOAD_MODE,ConfigurationOptions.DEFAULT_LOAD_MODE)
   private val sinkTaskUseRepartition: Boolean = 
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
     
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
 
@@ -210,9 +211,11 @@ class DorisWriter(settings: SparkSettings,
 
   @throws[IllegalArgumentException]
   private def generateLoader: Loader = {
-    val loadMode = settings.getProperty("load_mode", "stream_load")
-    if ("stream_load".equalsIgnoreCase(loadMode)) new StreamLoader(settings, 
isStreaming)
-    else throw new IllegalArgumentException(s"Unsupported load mode: 
$loadMode")
+    loadMode match {
+      case "stream_load" => new StreamLoader(settings, isStreaming)
+      case "copy_into" => new CopyIntoLoader(settings, isStreaming)
+      case _ => throw new IllegalArgumentException(s"Unsupported load mode: 
$loadMode")
+    }
   }
 
   def getTransactionHandler: TransactionHandler = txnHandler


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to