This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ee1cae  [improve] add auto redirect options (#150)
1ee1cae is described below

commit 1ee1cae591853de1f852c09c6c89ba8d221899a7
Author: wudi <[email protected]>
AuthorDate: Thu Oct 26 10:52:42 2023 +0800

    [improve] add auto redirect options (#150)
    
    Add redirection parameters. After opening, there is no need to obtain the 
be list, and streamloading is performed through fe.
---
 .../doris/spark/cfg/ConfigurationOptions.java      |  5 +++
 .../apache/doris/spark/load/DorisStreamLoad.java   | 46 ++++++++++++++++------
 .../org/apache/doris/spark/rest/RestService.java   |  2 +-
 .../doris/spark/rest/models/RespContent.java       |  4 +-
 .../spark/listener/DorisTransactionListener.scala  |  6 +--
 .../apache/doris/spark/writer/DorisWriter.scala    | 12 +++---
 .../doris/spark/sql/TestSparkConnector.scala       |  1 +
 7 files changed, 51 insertions(+), 25 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 6498916..7877bc8 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -119,4 +119,9 @@ public interface ConfigurationOptions {
     String DORIS_SINK_TXN_RETRIES = "doris.sink.txn.retries";
     int DORIS_SINK_TXN_RETRIES_DEFAULT = 3;
 
+    /**
+     * Use automatic redirection of fe without explicitly obtaining the be list
+     */
+    String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect";
+    boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = false;
 }
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 338ffbe..3d5bf36 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -16,14 +16,6 @@
 // under the License.
 package org.apache.doris.spark.load;
 
-import org.apache.doris.spark.cfg.ConfigurationOptions;
-import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.StreamLoadException;
-import org.apache.doris.spark.rest.RestService;
-import org.apache.doris.spark.rest.models.BackendV2;
-import org.apache.doris.spark.rest.models.RespContent;
-import org.apache.doris.spark.util.ResponseUtil;
-
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,6 +24,14 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.doris.spark.exception.IllegalArgumentException;
+import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.rest.RestService;
+import org.apache.doris.spark.rest.models.BackendV2;
+import org.apache.doris.spark.rest.models.RespContent;
+import org.apache.doris.spark.util.ResponseUtil;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -41,7 +41,9 @@ import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.BufferedHttpEntity;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
@@ -94,6 +96,7 @@ public class DorisStreamLoad implements Serializable {
     private boolean addDoubleQuotes;
     private static final long cacheExpireTimeout = 4 * 60;
     private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
+    private final String fenodes;
     private final String fileType;
     private String FIELD_DELIMITER;
     private final String LINE_DELIMITER;
@@ -101,11 +104,13 @@ public class DorisStreamLoad implements Serializable {
     private final boolean enable2PC;
     private final Integer txnRetries;
     private final Integer txnIntervalMs;
+    private final boolean autoRedirect;
 
     public DorisStreamLoad(SparkSettings settings) {
         String[] dbTable = 
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
         this.db = dbTable[0];
         this.tbl = dbTable[1];
+        this.fenodes = 
settings.getProperty(ConfigurationOptions.DORIS_FENODES);
         String user = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
         String passwd = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
         this.authEncoded = getAuthEncoded(user, passwd);
@@ -133,6 +138,9 @@ public class DorisStreamLoad implements Serializable {
                 ConfigurationOptions.DORIS_SINK_TXN_RETRIES_DEFAULT);
         this.txnIntervalMs = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS,
                 ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS_DEFAULT);
+
+        this.autoRedirect = 
settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT,
+                ConfigurationOptions.DORIS_SINK_AUTO_REDIRECT_DEFAULT);
     }
 
     public String getLoadUrlStr() {
@@ -143,7 +151,14 @@ public class DorisStreamLoad implements Serializable {
     }
 
     private CloseableHttpClient getHttpClient() {
-        HttpClientBuilder httpClientBuilder = 
HttpClientBuilder.create().disableRedirectHandling();
+        HttpClientBuilder httpClientBuilder = HttpClients
+                .custom()
+                .setRedirectStrategy(new DefaultRedirectStrategy() {
+                    @Override
+                    protected boolean isRedirectable(String method) {
+                        return true;
+                    }
+                });
         return httpClientBuilder.build();
     }
 
@@ -187,7 +202,7 @@ public class DorisStreamLoad implements Serializable {
         }
     }
 
-    public int load(Iterator<InternalRow> rows, StructType schema)
+    public long load(Iterator<InternalRow> rows, StructType schema)
             throws StreamLoadException, JsonProcessingException {
 
         String label = generateLoadLabel();
@@ -240,7 +255,7 @@ public class DorisStreamLoad implements Serializable {
 
     }
 
-    public Integer loadStream(Iterator<InternalRow> rows, StructType schema)
+    public Long loadStream(Iterator<InternalRow> rows, StructType schema)
             throws StreamLoadException, JsonProcessingException {
         if (this.streamingPassthrough) {
             handleStreamPassThrough();
@@ -248,7 +263,7 @@ public class DorisStreamLoad implements Serializable {
         return load(rows, schema);
     }
 
-    public void commit(int txnId) throws StreamLoadException {
+    public void commit(long txnId) throws StreamLoadException {
 
         try (CloseableHttpClient client = getHttpClient()) {
 
@@ -296,7 +311,7 @@ public class DorisStreamLoad implements Serializable {
      * @param txnId transaction id
      * @throws StreamLoadException
      */
-    public void abortById(int txnId) throws StreamLoadException {
+    public void abortById(long txnId) throws StreamLoadException {
 
         LOG.info("start abort transaction {}.", txnId);
 
@@ -385,6 +400,9 @@ public class DorisStreamLoad implements Serializable {
 
     private String getBackend() {
         try {
+            if (autoRedirect) {
+                return RestService.randomEndpoint(fenodes, LOG);
+            }
             // get backends from cache
             List<BackendV2.BackendRowV2> backends = cache.get("backends");
             Collections.shuffle(backends);
@@ -392,6 +410,8 @@ public class DorisStreamLoad implements Serializable {
             return backend.getIp() + ":" + backend.getHttpPort();
         } catch (ExecutionException e) {
             throw new RuntimeException("get backends info fail", e);
+        } catch (IllegalArgumentException e) {
+            throw new RuntimeException("get frontend info fail", e);
         }
     }
 
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index a00385c..2140c02 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -235,7 +235,7 @@ public class RestService implements Serializable {
      * @throws IllegalArgumentException fe nodes is illegal
      */
     @VisibleForTesting
-    static String randomEndpoint(String feNodes, Logger logger) throws 
IllegalArgumentException {
+    public static String randomEndpoint(String feNodes, Logger logger) throws 
IllegalArgumentException {
         logger.trace("Parse fenodes '{}'.", feNodes);
         if (StringUtils.isEmpty(feNodes)) {
             logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
index 7829cc2..a52ea13 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/RespContent.java
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 public class RespContent {
 
     @JsonProperty(value = "TxnId")
-    private int TxnId;
+    private long TxnId;
 
     @JsonProperty(value = "Label")
     private String Label;
@@ -75,7 +75,7 @@ public class RespContent {
     @JsonProperty(value = "ErrorURL")
     private String ErrorURL;
 
-    public int getTxnId() {
+    public long getTxnId() {
         return TxnId;
     }
 
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
index e670a30..b1e9d84 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
@@ -28,14 +28,14 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Success}
 
-class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], 
dorisStreamLoad: DorisStreamLoad, sinkTnxIntervalMs: Int, sinkTxnRetries: Int)
+class DorisTransactionListener(preCommittedTxnAcc: 
CollectionAccumulator[Long], dorisStreamLoad: DorisStreamLoad, 
sinkTnxIntervalMs: Int, sinkTxnRetries: Int)
   extends SparkListener {
 
   val logger: Logger = 
LoggerFactory.getLogger(classOf[DorisTransactionListener])
 
   override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
-    val txnIds: mutable.Buffer[Int] = preCommittedTxnAcc.value.asScala
-    val failedTxnIds = mutable.Buffer[Int]()
+    val txnIds: mutable.Buffer[Long] = preCommittedTxnAcc.value.asScala
+    val failedTxnIds = mutable.Buffer[Long]()
     jobEnd.jobResult match {
       // if job succeed, commit all transactions
       case JobSucceeded =>
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 6498bea..f90bcc6 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -82,10 +82,10 @@ class DorisWriter(settings: SparkSettings) extends 
Serializable {
     doWrite(dataFrame, dorisStreamLoader.loadStream)
   }
 
-  private def doWrite(dataFrame: DataFrame, loadFunc: 
(util.Iterator[InternalRow], StructType) => Int): Unit = {
+  private def doWrite(dataFrame: DataFrame, loadFunc: 
(util.Iterator[InternalRow], StructType) => Long): Unit = {
 
     val sc = dataFrame.sqlContext.sparkContext
-    val preCommittedTxnAcc = 
sc.collectionAccumulator[Int]("preCommittedTxnAcc")
+    val preCommittedTxnAcc = 
sc.collectionAccumulator[Long]("preCommittedTxnAcc")
     if (enable2PC) {
       sc.addSparkListener(new DorisTransactionListener(preCommittedTxnAcc, 
dorisStreamLoader, sinkTxnIntervalMs, sinkTxnRetries))
     }
@@ -99,7 +99,7 @@ class DorisWriter(settings: SparkSettings) extends 
Serializable {
 
       while (iterator.hasNext) {
         val batchIterator = new BatchIterator[InternalRow](iterator, 
batchSize, maxRetryTimes > 0)
-        val retry = Utils.retry[Int, Exception](maxRetryTimes, 
Duration.ofMillis(batchInterValMs.toLong), logger) _
+        val retry = Utils.retry[Long, Exception](maxRetryTimes, 
Duration.ofMillis(batchInterValMs.toLong), logger) _
         retry(loadFunc(batchIterator.asJava, schema))(batchIterator.reset()) 
match {
           case Success(txnId) =>
             if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc)
@@ -116,11 +116,11 @@ class DorisWriter(settings: SparkSettings) extends 
Serializable {
 
   }
 
-  private def handleLoadSuccess(txnId: Int, acc: CollectionAccumulator[Int]): 
Unit = {
+  private def handleLoadSuccess(txnId: Long, acc: 
CollectionAccumulator[Long]): Unit = {
     acc.add(txnId)
   }
 
-  private def handleLoadFailure(acc: CollectionAccumulator[Int]): Unit = {
+  private def handleLoadFailure(acc: CollectionAccumulator[Long]): Unit = {
     // if task run failed, acc value will not be returned to driver,
     // should abort all pre committed transactions inside the task
     logger.info("load task failed, start aborting previously pre-committed 
transactions")
@@ -128,7 +128,7 @@ class DorisWriter(settings: SparkSettings) extends 
Serializable {
       logger.info("no pre-committed transactions, skip abort")
       return
     }
-    val abortFailedTxnIds = mutable.Buffer[Int]()
+    val abortFailedTxnIds = mutable.Buffer[Long]()
     acc.value.asScala.foreach(txnId => {
       Utils.retry[Unit, Exception](3, Duration.ofSeconds(1), logger) {
         dorisStreamLoader.abortById(txnId)
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
index 3f05da2..a5e756c 100644
--- 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -65,6 +65,7 @@ class TestSparkConnector {
       .option("doris.table.identifier", dorisTable)
       .option("user", dorisUser)
       .option("password", dorisPwd)
+//      .option("sink.auto-redirect", "true")
       //specify your field
       .option("doris.write.fields", "name,gender")
       .option("sink.batch.size",2)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to