Nj-kol commented on issue #325:
URL: 
https://github.com/apache/doris-spark-connector/issues/325#issuecomment-3591048191

   > [@Nj-kol](https://github.com/Nj-kol) There are two issues here: reading 
and writing.
   > 
   > 1. For writing, configuring the external BE IP address on `doris.benodes` 
will solve the problem; this parameter is currently supported.
   > 2. For reading, as shown in the error message when accessing BE:9060, you 
need to switch to ArrowFlight for reading. The default Thrift method connects 
directly to BE:9060, but ArrowFlight provides an externally accessible method, 
requiring the configuration of `public_host={nginx ip}` and 
`arrow_flight_sql_proxy_port={nginx port}`.
   
   @JNSimba I tried out your suggestions, and here are my findings - 
   
   For reading, after switching to ArrowFlight and changing the config 
`public_host={load balancer ip}`, it worked! - So thanks for that : )
   
   But the issue with writing stills persists, it seems that the parameter 
`doris.benodes` has no effect.  Here is what I tried - 
   
   Create a table in doris:
   
   ```sql
   CREATE TABLE IF NOT EXISTS demos.user_data (
       user_id INT,
       name STRING,
       age INT,
       update_time STRING
   )
   UNIQUE KEY(user_id)
   DISTRIBUTED BY HASH(user_id) BUCKETS 3
   PROPERTIES (
       "replication_num" = "1",
       "enable_unique_key_merge_on_write" = "true"
   );
   ```
   
   Launch shell - 
   
   ```shell
   spark-shell \
   --packages org.apache.doris:spark-doris-connector-spark-3.5:25.2.0 \
   --jars /$HOME/Softwares/jars/mysql-connector-j-9.3.0.jar
   ```
   
   Code - 
   
   ```scala
   import org.apache.spark.sql.SparkSession
   import spark.implicits._
   import org.apache.spark.sql.SaveMode
   
   val feNodes = "<load_balancer_ip>:8030"
   val dorisUser = "root"
   val dorisPass = ""
   val beNodes = "<load_balancer_ip>:8040"
   
   // Sample batch data
   val batchData = Seq(
     (1, "Alice", 30, "2024-01-01 10:00:00"),
     (2, "Bob", 25, "2024-01-01 11:00:00")
   )
   
   val df = batchData.toDF("user_id", "name", "age", "update_time")
   
   df.write.format("doris")
     .option("doris.table.identifier", "demos.user_data")
     .option("doris.fenodes", feNodes)
     .option("doris.benodes", beNodes)
     .option("user", dorisUser)
     .option("password",dorisPass)
     .option("doris.query.port", "9030")
     .option("doris.write.fields", "user_id,name,age,update_time")
     .option("doris.read.mode", "arrow")
     .option("doris.read.arrow-flight-sql.port", "8070")
     .mode(SaveMode.Overwrite)
     .save()
   ```
   
   Error -
   
   ```shell
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 
0) (192.168.0.142 executor driver): java.net.UnknownHostException: 
doris-toolkit-dev-cg1-0.doris-toolkit-dev-cg1.doris.svc.cluster.local
   ```
   
   As you can see, the BE IP being used is still being fetched from FE as 
`doris-toolkit-dev-cg1-0.doris-toolkit-dev-cg1.doris.svc.cluster.local1` and 
not being taken from `doris.benodes`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to