This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7fddf0a throw exception when commit transaction failed (#206)
7fddf0a is described below
commit 7fddf0aea9ef70caa42e8b4b993395a6383616f5
Author: zhaorongsheng <[email protected]>
AuthorDate: Wed Jun 5 15:44:17 2024 +0800
throw exception when commit transaction failed (#206)
---
.../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 7 ++++---
.../src/main/scala/org/apache/doris/spark/load/StreamLoader.scala | 8 +++++---
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 6b1708d..7f97516 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -328,10 +328,11 @@ public class DorisStreamLoad implements Serializable {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res = MAPPER.readValue(loadResult, new
TypeReference<HashMap<String, String>>() {
});
- if (res.get("status").equals("Fail") &&
!ResponseUtil.isCommitted(res.get("msg"))) {
- throw new StreamLoadException("Commit failed " +
loadResult);
+ if (res.get("status").equals("Success") ||
ResponseUtil.isCommitted(res.get("msg"))) {
+ LOG.info("commit transaction {} succeed, load result:
{}.", txnId, loadResult);
} else {
- LOG.info("load result {}", loadResult);
+ LOG.error("commit transaction {} failed. load result: {}",
txnId, loadResult);
+ throw new StreamLoadException("Commit failed " +
loadResult);
}
}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
index 5986c08..9481b6f 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala
@@ -151,10 +151,12 @@ class StreamLoader(settings: SparkSettings, isStreaming:
Boolean) extends Loader
if (response.getEntity != null) {
val loadResult = EntityUtils.toString(response.getEntity)
val res = MAPPER.readValue(loadResult, new
TypeReference[util.HashMap[String, String]]() {})
- if (res.get("status") == "Fail" &&
!ResponseUtil.isCommitted(res.get("msg"))) throw new
StreamLoadException("Commit failed " + loadResult)
- else LOG.info("load result {}", loadResult)
+ if (res.get("status") == "Success" ||
ResponseUtil.isCommitted(res.get("msg"))) LOG.info("commit transaction {}
succeed, load result: {}.", msg.value, loadResult)
+ else {
+ LOG.error("commit transaction {} failed. load result: {}",
msg.value, loadResult)
+ throw new StreamLoadException("Commit failed " + loadResult)
+ }
}
-
} match {
case Success(_) => client.close()
case Failure(e) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]