This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a760509 [Improve] Fix column projection issue in Spark 3.3, 3.4, and
3.5 (#353)
a760509 is described below
commit a7605093e39d8e28198f66761fa8cdfdf0a22d71
Author: wudi <[email protected]>
AuthorDate: Wed Mar 25 16:51:38 2026 +0800
[Improve] Fix column projection issue in Spark 3.3, 3.4, and 3.5 (#353)
* Add log for spark connector
* Fix column projection issue in Spark 3.3, 3.4, and 3.5
---
.../apache/doris/spark/client/DorisBackendThriftClient.java | 2 +-
.../apache/doris/spark/client/read/AbstractThriftReader.java | 10 ++++++++++
.../scala/org/apache/doris/spark/read/DorisScanBuilder.scala | 2 +-
.../scala/org/apache/doris/spark/read/DorisScanBuilder.scala | 2 +-
.../scala/org/apache/doris/spark/read/DorisScanBuilder.scala | 2 +-
5 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
index 9343d84..01a2a81 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendThriftClient.java
@@ -234,7 +234,7 @@ public class DorisBackendThriftClient {
logger.warn("Close scanner from {} failed.", backend, e);
}
}
- logger.info("CloseScanner to Doris BE '{}' success.", backend);
+ logger.info("CloseScanner to Doris BE '{}' success or contextId {} .",
backend, closeParams.getContextId());
close();
}
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index 373910c..76ea2a9 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -132,6 +132,11 @@ public abstract class AbstractThriftReader extends
DorisReader {
offset += rowBatch.getReadRowCount();
rowBatch.close();
rowBatchQueue.put(rowBatch);
+ } else {
+ logger.info(
+ "Async scan finished , tablets: {}, offset: {}",
+ Arrays.toString(partition.getTablets()),
+ offset);
}
}
}
@@ -183,6 +188,11 @@ public abstract class AbstractThriftReader extends
DorisReader {
endOfStream.set(nextResult.isEos());
if (!endOfStream.get()) {
rowBatch = new RowBatch(nextResult, dorisSchema,
datetimeJava8ApiEnabled);
+ } else {
+ logger.info(
+ "Scan finished, tablets: {}, offset: {}",
+ Arrays.toString(partition.getTablets()),
+ offset);
}
}
hasNext = !endOfStream.get();
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 26b45f4..6bdd256 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -33,7 +33,7 @@ class DorisScanBuilder(config: DorisConfig, schema:
StructType) extends DorisSca
private var limitSize: Int = -1
- override def build(): Scan = new DorisScanV2(config, schema,
pushDownPredicates, limitSize)
+ override def build(): Scan = new DorisScanV2(config, readSchema,
pushDownPredicates, limitSize)
override def pushPredicates(predicates: Array[Predicate]): Array[Predicate]
= {
val (pushed, unsupported) = predicates.partition(predicate => {
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 26b45f4..6bdd256 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -33,7 +33,7 @@ class DorisScanBuilder(config: DorisConfig, schema:
StructType) extends DorisSca
private var limitSize: Int = -1
- override def build(): Scan = new DorisScanV2(config, schema,
pushDownPredicates, limitSize)
+ override def build(): Scan = new DorisScanV2(config, readSchema,
pushDownPredicates, limitSize)
override def pushPredicates(predicates: Array[Predicate]): Array[Predicate]
= {
val (pushed, unsupported) = predicates.partition(predicate => {
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
index 26b45f4..6bdd256 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala
@@ -33,7 +33,7 @@ class DorisScanBuilder(config: DorisConfig, schema:
StructType) extends DorisSca
private var limitSize: Int = -1
- override def build(): Scan = new DorisScanV2(config, schema,
pushDownPredicates, limitSize)
+ override def build(): Scan = new DorisScanV2(config, readSchema,
pushDownPredicates, limitSize)
override def pushPredicates(predicates: Array[Predicate]): Array[Predicate]
= {
val (pushed, unsupported) = predicates.partition(predicate => {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]