This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6fae4724d31 [enhancement](exception) catch exception for streamload
and validate column (#40092)
6fae4724d31 is described below
commit 6fae4724d319fdf5f5dde1a3576ad287f23ebe90
Author: yiguolei <[email protected]>
AuthorDate: Thu Aug 29 20:20:32 2024 +0800
[enhancement](exception) catch exception for streamload and validate column
(#40092)
---
be/src/http/action/http_stream.cpp | 58 ++++++++++++------------
be/src/http/action/stream_load.cpp | 35 +++++++-------
be/src/io/file_factory.cpp | 1 +
be/src/io/fs/stream_load_pipe.cpp | 4 +-
be/src/runtime/stream_load/stream_load_context.h | 8 ++--
be/src/util/byte_buffer.h | 9 +---
be/src/vec/sink/vtablet_block_convertor.cpp | 11 ++---
be/src/vec/sink/vtablet_block_convertor.h | 13 +++++-
be/test/util/byte_buffer2_test.cpp | 3 +-
9 files changed, 75 insertions(+), 67 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index 7dbae6df731..c6176c52815 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -237,37 +237,39 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
int64_t start_read_data_time = MonotonicNanos();
+ Status st = ctx->allocate_schema_buffer();
+ if (!st.ok()) {
+ ctx->status = st;
+ return;
+ }
while (evbuffer_get_length(evbuf) > 0) {
- try {
- auto bb = ByteBuffer::allocate(128 * 1024);
- auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
- bb->pos = remove_bytes;
- bb->flip();
- auto st = ctx->body_sink->append(bb);
- // schema_buffer stores 1M of data for parsing column information
- // need to determine whether to cache for the first time
- if (ctx->is_read_schema) {
- if (ctx->schema_buffer()->pos + remove_bytes <
config::stream_tvf_buffer_size) {
- ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
- } else {
- LOG(INFO) << "use a portion of data to request fe to
obtain column information";
- ctx->is_read_schema = false;
- ctx->status = process_put(req, ctx);
- }
- }
- if (!st.ok() && !ctx->status.ok()) {
- LOG(WARNING) << "append body content failed. errmsg=" << st <<
", " << ctx->brief();
- ctx->status = st;
- return;
- }
- ctx->receive_bytes += remove_bytes;
- } catch (const doris::Exception& e) {
- if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
- ctx->status = Status::MemoryLimitExceeded(
- fmt::format("PreCatch error code:{}, {}, ", e.code(),
e.to_string()));
+ ByteBufferPtr bb;
+ st = ByteBuffer::allocate(128 * 1024, &bb);
+ if (!st.ok()) {
+ ctx->status = st;
+ return;
+ }
+ auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+ bb->pos = remove_bytes;
+ bb->flip();
+ st = ctx->body_sink->append(bb);
+ // schema_buffer stores 1M of data for parsing column information
+ // need to determine whether to cache for the first time
+ if (ctx->is_read_schema) {
+ if (ctx->schema_buffer()->pos + remove_bytes <
config::stream_tvf_buffer_size) {
+ ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
+ } else {
+ LOG(INFO) << "use a portion of data to request fe to obtain
column information";
+ ctx->is_read_schema = false;
+ ctx->status = process_put(req, ctx);
}
- ctx->status = Status::Error<false>(e.code(), e.to_string());
}
+ if (!st.ok()) {
+ LOG(WARNING) << "append body content failed. errmsg=" << st << ",
" << ctx->brief();
+ ctx->status = st;
+ return;
+ }
+ ctx->receive_bytes += remove_bytes;
}
// after all the data has been read and it has not reached 1M, it will
execute here
if (ctx->is_read_schema) {
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index d0c5dff2075..1a9420dea63 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -344,25 +344,22 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
- try {
- auto bb = ByteBuffer::allocate(128 * 1024);
- auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
- bb->pos = remove_bytes;
- bb->flip();
- auto st = ctx->body_sink->append(bb);
- if (!st.ok()) {
- LOG(WARNING) << "append body content failed. errmsg=" << st <<
", " << ctx->brief();
- ctx->status = st;
- return;
- }
- ctx->receive_bytes += remove_bytes;
- } catch (const doris::Exception& e) {
- if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
- ctx->status = Status::MemoryLimitExceeded(
- fmt::format("PreCatch error code:{}, {}, ", e.code(),
e.to_string()));
- }
- ctx->status = Status::Error<false>(e.code(), e.to_string());
- }
+ ByteBufferPtr bb;
+ Status st = ByteBuffer::allocate(128 * 1024, &bb);
+ if (!st.ok()) {
+ ctx->status = st;
+ return;
+ }
+ auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+ bb->pos = remove_bytes;
+ bb->flip();
+ st = ctx->body_sink->append(bb);
+ if (!st.ok()) {
+ LOG(WARNING) << "append body content failed. errmsg=" << st << ",
" << ctx->brief();
+ ctx->status = st;
+ return;
+ }
+ ctx->receive_bytes += remove_bytes;
}
int64_t read_data_time = MonotonicNanos() - start_read_data_time;
int64_t last_receive_and_read_data_cost_nanos =
ctx->receive_and_read_data_cost_nanos;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index f4ce573c535..86907886f17 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -206,6 +206,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId&
load_id, io::FileReaderS
return Status::InternalError("unknown stream load id: {}",
UniqueId(load_id).to_string());
}
if (need_schema) {
+ RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
diff --git a/be/src/io/fs/stream_load_pipe.cpp
b/be/src/io/fs/stream_load_pipe.cpp
index ce91a2e8391..0dc27e009d0 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -113,7 +113,7 @@ Status
StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t
proto_byte_size) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf,
128 * 1024));
+ RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
@@ -148,7 +148,7 @@ Status StreamLoadPipe::append(const char* data, size_t
size) {
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf,
chunk_size));
+ RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 95e56e0b3fa..9d1601372f8 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -121,15 +121,17 @@ public:
bool is_mow_table() const;
- ByteBufferPtr schema_buffer() {
+ Status allocate_schema_buffer() {
if (_schema_buffer == nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->stream_load_pipe_tracker());
- _schema_buffer =
ByteBuffer::allocate(config::stream_tvf_buffer_size);
+ return ByteBuffer::allocate(config::stream_tvf_buffer_size,
&_schema_buffer);
}
- return _schema_buffer;
+ return Status::OK();
}
+ ByteBufferPtr schema_buffer() { return _schema_buffer; }
+
public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index 1499f51c053..aafd4506087 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -34,13 +34,8 @@ struct ByteBuffer;
using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
struct ByteBuffer : private Allocator<false> {
- static ByteBufferPtr allocate(size_t size) {
- ByteBufferPtr ptr(new ByteBuffer(size));
- return ptr;
- }
-
- static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) {
- ptr = ByteBufferPtr(new ByteBuffer(size));
+ static Status allocate(const size_t size, ByteBufferPtr* ptr) {
+ RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new
ByteBuffer(size)); });
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp
b/be/src/vec/sink/vtablet_block_convertor.cpp
index feb6633511e..617668c035a 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -182,12 +182,11 @@ DecimalType
OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip
return DecimalType(value);
}
-Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const
TypeDescriptor& type,
- bool is_nullable,
vectorized::ColumnPtr column,
- size_t slot_index, bool*
stop_processing,
- fmt::memory_buffer&
error_prefix,
- const uint32_t row_count,
-
vectorized::IColumn::Permutation* rows) {
+Status OlapTableBlockConvertor::_internal_validate_column(
+ RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
+ vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
+ fmt::memory_buffer& error_prefix, const uint32_t row_count,
+ vectorized::IColumn::Permutation* rows) {
DCHECK((rows == nullptr) || (rows->size() == row_count));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
diff --git a/be/src/vec/sink/vtablet_block_convertor.h
b/be/src/vec/sink/vtablet_block_convertor.h
index 0db340ce6c2..7f866c38032 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -69,7 +69,18 @@ private:
Status _validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable,
vectorized::ColumnPtr column, size_t slot_index,
bool* stop_processing,
fmt::memory_buffer& error_prefix, const uint32_t
row_count,
- vectorized::IColumn::Permutation* rows = nullptr);
+ vectorized::IColumn::Permutation* rows = nullptr) {
+ RETURN_IF_CATCH_EXCEPTION({
+ return _internal_validate_column(state, type, is_nullable, column,
slot_index,
+ stop_processing, error_prefix,
row_count, rows);
+ });
+ }
+
+ Status _internal_validate_column(RuntimeState* state, const
TypeDescriptor& type,
+ bool is_nullable, vectorized::ColumnPtr
column,
+ size_t slot_index, bool* stop_processing,
+ fmt::memory_buffer& error_prefix, const
uint32_t row_count,
+ vectorized::IColumn::Permutation* rows =
nullptr);
// make input data valid for OLAP table
// return number of invalid/filtered rows.
diff --git a/be/test/util/byte_buffer2_test.cpp
b/be/test/util/byte_buffer2_test.cpp
index 04b62cd5fe8..73c38c9e404 100644
--- a/be/test/util/byte_buffer2_test.cpp
+++ b/be/test/util/byte_buffer2_test.cpp
@@ -32,7 +32,8 @@ public:
};
TEST_F(ByteBufferTest, normal) {
- auto buf = ByteBuffer::allocate(4);
+ ByteBufferPtr buf;
+ Status st = ByteBuffer::allocate(4, &buf);
EXPECT_EQ(0, buf->pos);
EXPECT_EQ(4, buf->limit);
EXPECT_EQ(4, buf->capacity);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]