This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new feda66f Spark return error to users when spark on doris query failed
(#2531)
feda66f is described below
commit feda66f99f9682ae050a8b0216b2e3578da45cd8
Author: Youngwb <[email protected]>
AuthorDate: Mon Dec 30 21:58:13 2019 +0800
Spark return error to users when spark on doris query failed (#2531)
---
be/src/common/config.h | 2 +-
be/src/runtime/CMakeLists.txt | 1 +
be/src/runtime/fragment_mgr.cpp | 1 +
be/src/runtime/memory_scratch_sink.h | 2 +-
be/src/runtime/plan_fragment_executor.cpp | 5 ++
.../src/runtime/record_batch_queue.cpp | 25 ++++++--
be/src/runtime/record_batch_queue.h | 66 ++++++++++++++++++++++
be/src/runtime/result_queue_mgr.cpp | 24 ++++++--
be/src/runtime/result_queue_mgr.h | 12 ++--
be/src/service/backend_service.cpp | 2 +
be/test/runtime/result_queue_mgr_test.cpp | 16 +++---
.../apache/doris/spark/backend/BackendClient.java | 13 ++++-
.../DorisInternalException.java} | 16 ++++--
.../org/apache/doris/spark/util/ErrorMessages.java | 1 +
gensrc/thrift/PaloInternalService.thrift | 1 +
15 files changed, 154 insertions(+), 33 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 51cad63..1937287 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -443,7 +443,7 @@ namespace config {
CONF_Bool(auto_recover_index_loading_failure, "false");
// max external scan cache batch count, means cache
max_memory_cache_batch_count * batch_size row
- // default is 10, batch_size's defualt value is 1024 means 10 * 1024 rows
will be cached
+ // default is 20, batch_size's defualt value is 1024 means 20 * 1024 rows
will be cached
CONF_Int32(max_memory_sink_batch_count, "20");
// This configuration is used for the context gc thread schedule period
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index b99183a..40fa551 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -101,6 +101,7 @@ set(RUNTIME_FILES
routine_load/data_consumer_pool.cpp
routine_load/routine_load_task_executor.cpp
small_file_mgr.cpp
+ record_batch_queue.cpp
result_queue_mgr.cpp
memory_scratch_sink.cpp
external_scan_context_mgr.cpp
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 21159a8..7deb819 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -665,6 +665,7 @@ Status FragmentMgr::exec_external_plan_fragment(const
TScanOpenParams& params, c
TQueryOptions query_options;
query_options.batch_size = params.batch_size;
query_options.query_timeout = params.query_timeout;
+ query_options.query_type = TQueryType::EXTERNAL;
exec_fragment_params.__set_query_options(query_options);
VLOG_ROW << "external exec_plan_fragment params is "
<<
apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
diff --git a/be/src/runtime/memory_scratch_sink.h
b/be/src/runtime/memory_scratch_sink.h
index 510a7e0..6b88ce9 100644
--- a/be/src/runtime/memory_scratch_sink.h
+++ b/be/src/runtime/memory_scratch_sink.h
@@ -80,7 +80,7 @@ private:
const RowDescriptor& _row_desc;
std::shared_ptr<arrow::Schema> _arrow_schema;
- shared_block_queue_t _queue;
+ BlockQueueSharedPtr _queue;
RuntimeProfile* _profile; // Allocated from _pool
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 4a9eb0e..ff703eb 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -34,6 +34,7 @@
#include "runtime/descriptors.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/result_buffer_mgr.h"
+#include "runtime/result_queue_mgr.h"
#include "runtime/row_batch.h"
#include "runtime/mem_tracker.h"
#include "util/cpu_info.h"
@@ -519,6 +520,10 @@ void PlanFragmentExecutor::update_status(const Status&
status) {
_runtime_state->set_mem_limit_exceeded(status.get_error_msg());
}
_status = status;
+ if (_runtime_state->query_options().query_type ==
TQueryType::EXTERNAL) {
+ TUniqueId fragment_instance_id =
_runtime_state->fragment_instance_id();
+
_exec_env->result_queue_mgr()->update_queue_status(fragment_instance_id,
status);
+ }
}
}
diff --git
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
b/be/src/runtime/record_batch_queue.cpp
similarity index 63%
copy from
extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
copy to be/src/runtime/record_batch_queue.cpp
index aff289d..ffd9067 100644
---
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
+++ b/be/src/runtime/record_batch_queue.cpp
@@ -15,11 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark.util;
+#include "runtime/record_batch_queue.h"
+
+namespace doris {
+
+void RecordBatchQueue::update_status(const Status& status) {
+ if (status.ok()) {
+ return;
+ }
+ {
+ std::lock_guard<SpinLock> l(_status_lock);
+ if (_status.ok()) {
+ _status = status;
+ }
+ }
+}
+
+void RecordBatchQueue::shutdown() {
+ _queue.shutdown();
+}
-public abstract class ErrorMessages {
- public static final String PARSE_NUMBER_FAILED_MESSAGE = "Parse '{}' to
number failed. Original string is '{}'.";
- public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {}
failed.";
- public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is
illegal, value is '{}'.";
- public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come
here.";
}
diff --git a/be/src/runtime/record_batch_queue.h
b/be/src/runtime/record_batch_queue.h
new file mode 100644
index 0000000..ee5cfb7
--- /dev/null
+++ b/be/src/runtime/record_batch_queue.h
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_RECORD_BATCH_QUEUE_H
+#define DORIS_RECORD_BATCH_QUEUE_H
+
+#include <util/spinlock.h>
+#include "common/status.h"
+#include "util/blocking_queue.hpp"
+
+namespace arrow {
+
+class RecordBatch;
+}
+
+namespace doris {
+
+// The RecordBatchQueue is created and managed by the ResultQueueMgr to
+// cache external query results, as well as query status. Where both
+// BlockingGet and BlockingPut operations block if the queue is empty or
+// full, respectively.
+class RecordBatchQueue {
+public:
+ RecordBatchQueue(u_int32_t max_elements) : _queue(max_elements) {}
+
+ Status status() {
+ std::lock_guard<SpinLock> l(_status_lock);
+ return _status;
+ }
+
+ void update_status(const Status& status);
+
+ bool blocking_get(std::shared_ptr<arrow::RecordBatch>* result) {
+ return _queue.blocking_get(result);
+ }
+
+ bool blocking_put(const std::shared_ptr<arrow::RecordBatch>& val) {
+ return _queue.blocking_put(val);
+ }
+
+ // Shut down the queue. Wakes up all threads waiting on blocking_get or
blocking_put.
+ void shutdown();
+
+private:
+ BlockingQueue<std::shared_ptr<arrow::RecordBatch>> _queue;
+ SpinLock _status_lock;
+ Status _status;
+};
+
+}
+
+#endif //DORIS_RECORD_BATCH_QUEUE_H
diff --git a/be/src/runtime/result_queue_mgr.cpp
b/be/src/runtime/result_queue_mgr.cpp
index a4c7749..f674826 100644
--- a/be/src/runtime/result_queue_mgr.cpp
+++ b/be/src/runtime/result_queue_mgr.cpp
@@ -27,13 +27,13 @@
namespace doris {
-ResultQueueMgr::ResultQueueMgr() :
_max_sink_batch_count(config::max_memory_sink_batch_count) {
+ResultQueueMgr::ResultQueueMgr() {
}
ResultQueueMgr::~ResultQueueMgr() {
}
Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id,
std::shared_ptr<arrow::RecordBatch>* result, bool *eos) {
- shared_block_queue_t queue;
+ BlockQueueSharedPtr queue;
{
std::lock_guard<std::mutex> l(_lock);
auto iter = _fragment_queue_map.find(fragment_instance_id);
@@ -43,6 +43,8 @@ Status ResultQueueMgr::fetch_result(const TUniqueId&
fragment_instance_id, std::
return Status::InternalError("fragment_instance_id does not
exists");
}
}
+ // check queue status before get result
+ RETURN_IF_ERROR(queue->status());
bool sucess = queue->blocking_get(result);
if (sucess) {
// sentinel nullptr indicates scan end
@@ -61,14 +63,14 @@ Status ResultQueueMgr::fetch_result(const TUniqueId&
fragment_instance_id, std::
return Status::OK();
}
-void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id,
shared_block_queue_t* queue) {
+void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id,
BlockQueueSharedPtr* queue) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _fragment_queue_map.find(fragment_instance_id);
if (iter != _fragment_queue_map.end()) {
*queue = iter->second;
} else {
// the blocking queue size = 20 (default), in this way, one queue have
20 * 1024 rows at most
- shared_block_queue_t tmp(new
BlockingQueue<std::shared_ptr<arrow::RecordBatch>>(_max_sink_batch_count));
+ BlockQueueSharedPtr tmp(new
RecordBatchQueue(config::max_memory_sink_batch_count));
_fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp));
*queue = tmp;
}
@@ -78,10 +80,24 @@ Status ResultQueueMgr::cancel(const TUniqueId&
fragment_instance_id) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _fragment_queue_map.find(fragment_instance_id);
if (iter != _fragment_queue_map.end()) {
+ // first remove RecordBatch from queue
+ // avoid MemoryScratchSink block on send or close operation
+ iter->second->shutdown();
// remove this queue from map
_fragment_queue_map.erase(fragment_instance_id);
}
return Status::OK();
}
+void ResultQueueMgr::update_queue_status(const TUniqueId&
fragment_instance_id, const Status& status) {
+ if (status.ok()) {
+ return;
+ }
+ std::lock_guard<std::mutex> l(_lock);
+ auto iter = _fragment_queue_map.find(fragment_instance_id);
+ if (iter != _fragment_queue_map.end()) {
+ iter->second->update_status(status);
+ }
+}
+
}
diff --git a/be/src/runtime/result_queue_mgr.h
b/be/src/runtime/result_queue_mgr.h
index 81c1464..9ceba79 100644
--- a/be/src/runtime/result_queue_mgr.h
+++ b/be/src/runtime/result_queue_mgr.h
@@ -23,10 +23,10 @@
#include <unordered_map>
#include "common/status.h"
-#include "util/blocking_queue.hpp"
#include "util/hash_util.hpp"
#include "runtime/primitive_type.h"
#include "runtime/raw_value.h"
+#include "runtime/record_batch_queue.h"
namespace arrow {
@@ -38,7 +38,8 @@ namespace doris {
class TUniqueId;
class TScanRowBatch;
-typedef std::shared_ptr<BlockingQueue< std::shared_ptr<arrow::RecordBatch>>>
shared_block_queue_t;
+class RecordBatchQueue;
+typedef std::shared_ptr<RecordBatchQueue> BlockQueueSharedPtr;
class ResultQueueMgr {
@@ -48,14 +49,15 @@ public:
Status fetch_result(const TUniqueId& fragment_instance_id,
std::shared_ptr<arrow::RecordBatch>* result, bool *eos);
- void create_queue(const TUniqueId& fragment_instance_id,
shared_block_queue_t* queue);
+ void create_queue(const TUniqueId& fragment_instance_id,
BlockQueueSharedPtr* queue);
Status cancel(const TUniqueId& fragment_id);
+ void update_queue_status(const TUniqueId& fragment_id, const Status&
status);
+
private:
std::mutex _lock;
- u_int32_t _max_sink_batch_count;
- std::unordered_map<TUniqueId, shared_block_queue_t> _fragment_queue_map;
+ std::unordered_map<TUniqueId, BlockQueueSharedPtr> _fragment_queue_map;
};
}
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 6d9dbfa..8ef958e 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -291,6 +291,7 @@ void BackendService::get_next(TScanBatchResult& result_,
const TScanNextBatchPar
TUniqueId fragment_instance_id = context->fragment_instance_id;
std::shared_ptr<arrow::RecordBatch> record_batch;
bool eos;
+
st = _exec_env->result_queue_mgr()->fetch_result(fragment_instance_id,
&record_batch, &eos);
if (st.ok()) {
result_.__set_eos(eos);
@@ -307,6 +308,7 @@ void BackendService::get_next(TScanBatchResult& result_,
const TScanNextBatchPar
}
}
} else {
+ LOG(WARNING) << "fragment_instance_id [" <<
print_id(fragment_instance_id) << "] fetch result status [" << st.to_string() +
"]";
st.to_thrift(&t_status);
result_.status = t_status;
}
diff --git a/be/test/runtime/result_queue_mgr_test.cpp
b/be/test/runtime/result_queue_mgr_test.cpp
index 85f1737..f8c0a3e 100644
--- a/be/test/runtime/result_queue_mgr_test.cpp
+++ b/be/test/runtime/result_queue_mgr_test.cpp
@@ -42,7 +42,7 @@ protected:
};
TEST_F(ResultQueueMgrTest, create_normal) {
- shared_block_queue_t block_queue_t;
+ BlockQueueSharedPtr block_queue_t;
TUniqueId query_id;
query_id.lo = 10;
query_id.hi = 100;
@@ -57,11 +57,11 @@ TEST_F(ResultQueueMgrTest, create_same_queue) {
query_id.lo = 10;
query_id.hi = 100;
- shared_block_queue_t block_queue_t_1;
+ BlockQueueSharedPtr block_queue_t_1;
queue_mgr.create_queue(query_id, &block_queue_t_1);
ASSERT_TRUE(block_queue_t_1 != nullptr);
- shared_block_queue_t block_queue_t_2;
+ BlockQueueSharedPtr block_queue_t_2;
queue_mgr.create_queue(query_id, &block_queue_t_2);
ASSERT_TRUE(block_queue_t_2 != nullptr);
@@ -74,7 +74,7 @@ TEST_F(ResultQueueMgrTest, fetch_result_normal) {
query_id.hi = 100;
ResultQueueMgr queue_mgr;
- shared_block_queue_t block_queue_t;
+ BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
@@ -110,7 +110,7 @@ TEST_F(ResultQueueMgrTest, fetch_result_end) {
query_id.lo = 10;
query_id.hi = 100;
- shared_block_queue_t block_queue_t;
+ BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
block_queue_t->blocking_put(nullptr);
@@ -127,7 +127,7 @@ TEST_F(ResultQueueMgrTest, normal_cancel) {
query_id.lo = 10;
query_id.hi = 100;
ResultQueueMgr queue_mgr;
- shared_block_queue_t block_queue_t;
+ BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
ASSERT_TRUE(queue_mgr.cancel(query_id).ok());
@@ -138,7 +138,7 @@ TEST_F(ResultQueueMgrTest, cancel_no_block) {
query_id.lo = 10;
query_id.hi = 100;
ResultQueueMgr queue_mgr;
- shared_block_queue_t block_queue_t;
+ BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
ASSERT_TRUE(queue_mgr.cancel(query_id).ok());
@@ -155,4 +155,4 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
doris::CpuInfo::init();
return RUN_ALL_TESTS();
-}
\ No newline at end of file
+}
diff --git
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
index 62d6398..90baf79 100644
---
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
+++
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
@@ -19,6 +19,8 @@ package org.apache.doris.spark.backend;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.exception.ConnectedFailedException;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.exception.DorisInternalException;
import org.apache.doris.spark.util.ErrorMessages;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.serialization.Routing;
@@ -151,16 +153,17 @@ public class BackendClient {
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
*/
- public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams)
throws ConnectedFailedException {
+ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams)
throws DorisException {
logger.debug("GetNext to '{}', parameter is '{}'.", routing,
nextBatchParams);
if (!isConnected) {
open();
}
TException ex = null;
+ TScanBatchResult result = null;
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
- TScanBatchResult result = client.get_next(nextBatchParams);
+ result = client.get_next(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
@@ -176,6 +179,12 @@ public class BackendClient {
ex = e;
}
}
+ if (result != null && (TStatusCode.OK !=
(result.getStatus().getStatus_code()))) {
+ logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing,
result.getStatus().getStatus_code(),
+ result.getStatus().getError_msgs());
+ throw new DorisInternalException(routing.toString(),
result.getStatus().getStatus_code(),
+ result.getStatus().getError_msgs());
+ }
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
throw new ConnectedFailedException(routing.toString(), ex);
}
diff --git
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java
similarity index 63%
copy from
extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
copy to
extension/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java
index aff289d..f42acee 100644
---
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
+++
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java
@@ -15,11 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark.util;
+package org.apache.doris.spark.exception;
+
+import org.apache.doris.thrift.TStatusCode;
+
+import java.util.List;
+
+public class DorisInternalException extends DorisException {
+ public DorisInternalException(String server, TStatusCode statusCode,
List<String> errorMsgs) {
+ super("Doris server " + server + " internal failed, status code [" +
statusCode + "] error message is " + errorMsgs);
+ }
-public abstract class ErrorMessages {
- public static final String PARSE_NUMBER_FAILED_MESSAGE = "Parse '{}' to
number failed. Original string is '{}'.";
- public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {}
failed.";
- public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is
illegal, value is '{}'.";
- public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come
here.";
}
diff --git
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
index aff289d..92a04e9 100644
---
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
+++
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java
@@ -22,4 +22,5 @@ public abstract class ErrorMessages {
public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {}
failed.";
public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is
illegal, value is '{}'.";
public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come
here.";
+ public static final String DORIS_INTERNAL_FAIL_MESSAGE = "Doris server
'{}' internal failed, status is '{}', error message is '{}'";
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 4f7ac35..fba6012 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -42,6 +42,7 @@ const i64 DEFAULT_PARTITION_ID = -1;
enum TQueryType {
SELECT,
LOAD,
+ EXTERNAL
}
enum TErrorHubType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]