This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new d3e1baddeb [feature](branch-2.0) add failure tolerance for strict mode
partial update stream load (#22100)
d3e1baddeb is described below
commit d3e1baddebf9be68367af794ab8589945a31bfef
Author: bobhan1 <[email protected]>
AuthorDate: Mon Jul 24 19:35:12 2023 +0800
[feature](branch-2.0) add failure tolerance for strict mode partial update
stream load (#22100)
---
be/src/olap/delta_writer.cpp | 4 ++++
be/src/olap/delta_writer.h | 2 ++
be/src/olap/rowset/beta_rowset_writer.cpp | 2 ++
be/src/olap/rowset/beta_rowset_writer.h | 5 ++++
be/src/olap/rowset/rowset_writer.h | 2 ++
be/src/olap/rowset/segment_v2/segment_writer.cpp | 11 ++++++---
be/src/olap/rowset/segment_v2/segment_writer.h | 3 +++
be/src/runtime/runtime_state.cpp | 1 +
be/src/runtime/runtime_state.h | 9 +++++++
be/src/runtime/tablets_channel.cpp | 1 +
be/src/vec/sink/vtablet_sink.cpp | 12 ++++++++--
gensrc/proto/internal_service.proto | 1 +
.../test_partial_update_strict_mode.out | 13 ++++++++++
...oovy => test_partial_update_strict_mode.groovy} | 28 +++++++++++++++++-----
.../test_partial_update_upsert.groovy | 2 +-
15 files changed, 84 insertions(+), 12 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index da2cc7cda7..c5c0797657 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -725,4 +725,8 @@ void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t
node_id, bool is_succe
_unfinished_slave_node.erase(node_id);
}
+int64_t DeltaWriter::num_rows_filtered() const {
+ return _rowset_writer == nullptr ? 0 : _rowset_writer->num_rows_filtered();
+}
+
} // namespace doris
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 52b407876f..daf091bf8e 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -129,6 +129,8 @@ public:
int64_t total_received_rows() const { return _total_received_rows; }
+ int64_t num_rows_filtered() const;
+
private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile,
const UniqueId& load_id);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 03de7cf84d..a12e81d625 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -70,6 +70,7 @@ BetaRowsetWriter::BetaRowsetWriter()
_total_data_size(0),
_total_index_size(0),
_raw_num_rows_written(0),
+ _num_rows_filtered(0),
_segcompaction_worker(this),
_is_doing_segcompaction(false) {
_segcompaction_status.store(OK);
@@ -829,6 +830,7 @@ Status
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << "
row_num:" << row_num
<< " data_size:" << segment_size << " index_size:" <<
index_size;
+ _num_rows_filtered += (*writer)->num_rows_filtered();
writer->reset();
if (flush_size) {
*flush_size = segment_size + index_size;
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index b646f2a681..c7554cee72 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -36,6 +36,7 @@
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
+#include "olap/delta_writer.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
@@ -96,6 +97,8 @@ public:
int64_t num_rows() const override { return _raw_num_rows_written; }
+ int64_t num_rows_filtered() const override { return _num_rows_filtered; }
+
RowsetId rowset_id() override { return _context.rowset_id; }
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
@@ -201,6 +204,8 @@ protected:
KeyBoundsPB key_bounds;
};
std::map<uint32_t, Statistics> _segid_statistics_map;
+ std::atomic<int64_t> _num_rows_filtered;
+
std::mutex _segid_statistics_map_mutex;
bool _is_pending = false;
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index a614a48470..60a6fdc570 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -101,6 +101,8 @@ public:
virtual int64_t num_rows() const = 0;
+ virtual int64_t num_rows_filtered() const = 0;
+
virtual RowsetId rowset_id() = 0;
virtual RowsetTypePB type() const = 0;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 16be990554..226e1062db 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -46,6 +46,7 @@
#include "olap/segment_loader.h"
#include "olap/short_key_index.h"
#include "olap/tablet_schema.h"
+#include "olap/utils.h"
#include "runtime/memory/mem_tracker.h"
#include "service/point_query_executor.h"
#include "util/coding.h"
@@ -376,6 +377,8 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
std::vector<std::unique_ptr<SegmentCacheHandle>>
segment_caches(specified_rowsets.size());
// locate rows in base data
+
+ int64_t num_rows_filtered = 0;
{
for (size_t pos = row_pos; pos < num_rows; pos++) {
std::string key = _full_encode_keys(key_columns, pos);
@@ -389,9 +392,10 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
_mow_context->max_version,
segment_caches, &rowset);
if (st.is<NOT_FOUND>()) {
if (_tablet_schema->is_strict_mode()) {
- return Status::InternalError(
- "partial update in strict mode only support
updating rows with an "
- "existing key!");
+ ++num_rows_filtered;
+ // delete the invalid newly inserted row
+
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, 0},
+ pos);
}
if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
@@ -440,6 +444,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
num_rows));
}
+ _num_rows_filtered += num_rows_filtered;
_num_rows_written += num_rows;
_olap_data_convertor->clear_source_content();
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 742fa63a4f..0b17ed4faa 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -109,6 +109,7 @@ public:
size_t get_inverted_index_file_size() const { return
_inverted_index_file_size; }
uint32_t num_rows_written() const { return _num_rows_written; }
+ int64_t num_rows_filtered() const { return _num_rows_filtered; }
uint32_t row_count() const { return _row_count; }
Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
@@ -196,6 +197,8 @@ private:
bool _has_key = true;
// _num_rows_written means row count already written in this current
column group
uint32_t _num_rows_written = 0;
+ // number of rows filtered in strict mode partial update
+ int64_t _num_rows_filtered = 0;
// _row_count means total row count of this segment
// In vertical compaction row count is recorded when key columns group
finish
// and _num_rows_written will be updated in value column group
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 28de6c752e..0a6ebca77d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -115,6 +115,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams&
pipeline_params,
_num_rows_load_total(0),
_num_rows_load_filtered(0),
_num_rows_load_unselected(0),
+ _num_rows_filtered_in_strict_mode_partial_update(0),
_num_print_error_rows(0),
_num_bytes_load_total(0),
_num_finished_scan_range(0),
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 89543e492b..6ef4ee1081 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -241,6 +241,10 @@ public:
int64_t num_rows_load_unselected() { return
_num_rows_load_unselected.load(); }
+ int64_t num_rows_filtered_in_strict_mode_partial_update() {
+ return _num_rows_filtered_in_strict_mode_partial_update;
+ }
+
int64_t num_rows_load_success() {
return num_rows_load_total() - num_rows_load_filtered() -
num_rows_load_unselected();
}
@@ -265,6 +269,10 @@ public:
_num_rows_load_unselected.fetch_add(num_rows);
}
+ void update_num_rows_filtered_in_strict_mode_partial_update(int64_t
num_rows) {
+ _num_rows_filtered_in_strict_mode_partial_update += num_rows;
+ }
+
void set_per_fragment_instance_idx(int idx) { _per_fragment_instance_idx =
idx; }
int per_fragment_instance_idx() const { return _per_fragment_instance_idx;
}
@@ -492,6 +500,7 @@ private:
std::atomic<int64_t> _num_rows_load_total; // total rows read from
source
std::atomic<int64_t> _num_rows_load_filtered; // unqualified rows
std::atomic<int64_t> _num_rows_load_unselected; // rows filtered by
predicates
+ std::atomic<int64_t> _num_rows_filtered_in_strict_mode_partial_update;
std::atomic<int64_t> _num_print_error_rows;
std::atomic<int64_t> _num_bytes_load_total; // total bytes read from source
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index ad8b724ebc..3862fd533e 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -240,6 +240,7 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
tablet_info->set_tablet_id(writer->tablet_id());
tablet_info->set_schema_hash(writer->schema_hash());
tablet_info->set_received_rows(writer->total_received_rows());
+ tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
} else {
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(writer->tablet_id());
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 47f34ab7d0..37d918b18b 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -424,6 +424,10 @@ Status VNodeChannel::open_wait() {
_tablets_received_rows.emplace_back(tablet.tablet_id(),
tablet.received_rows());
}
+ if (tablet.has_num_rows_filtered()) {
+
_state->update_num_rows_filtered_in_strict_mode_partial_update(
+ tablet.num_rows_filtered());
+ }
VLOG_CRITICAL << "master replica commit info: tabletId="
<< tablet.tablet_id()
<< ", backendId=" << _node_id
<< ", master node id: " << this->node_id()
@@ -1507,7 +1511,9 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
COUNTER_SET(_input_rows_counter, _number_input_rows);
COUNTER_SET(_output_rows_counter, _number_output_rows);
- COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
+ COUNTER_SET(_filtered_rows_counter,
+ _number_filtered_rows +
+
state->num_rows_filtered_in_strict_mode_partial_update());
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_filter_timer, _filter_ns);
@@ -1527,7 +1533,9 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
state->num_rows_load_unselected();
state->set_num_rows_load_total(num_rows_load_total);
- state->update_num_rows_load_filtered(_number_filtered_rows);
+ state->update_num_rows_load_filtered(
+ _number_filtered_rows +
+ state->num_rows_filtered_in_strict_mode_partial_update());
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
// print log of add batch time of all node, for tracing load
performance easily
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 148efec1a2..08a3240e00 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -69,6 +69,7 @@ message PTabletInfo {
repeated string invalid_dict_cols = 3;
// total rows num received by DeltaWriter
optional int64 received_rows = 4;
+ optional int64 num_rows_filtered = 5 [default = 0];
}
// open a tablet writer
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out
new file mode 100644
index 0000000000..ab2a296079
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 kevin 18 shenzhen 400 2023-07-01T12:00
+
+-- !sql --
+1 kevin 18 shenzhen 500 2023-07-03T12:00:01
+
+-- !sql --
+1 kevin 18 shenzhen 400 2023-07-01T12:00
+
+-- !sql --
+1 kevin 18 shenzhen 400 2023-07-01T12:00
+
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
similarity index 78%
copy from
regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
copy to
regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
index 4da4159790..df17d1cd5d 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_strict_mode.groovy
@@ -16,9 +16,9 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_partial_update_upsert", "p0") {
+suite("test_partial_update_strict_mode", "p0") {
- def tableName = "test_partial_update_upsert1"
+ def tableName = "test_partial_update_strict_mode"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -48,17 +48,27 @@ suite("test_partial_update_upsert", "p0") {
set 'format', 'csv'
set 'partial_columns', 'true'
set 'columns', 'id,balance,last_access_time'
- set 'strict_mode', 'false'
+ set 'strict_mode', 'true'
+ set 'max_filter_ratio', '1'
file 'upsert.csv'
time 10000 // limit inflight 10s
+
+ check {result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(3, json.NumberTotalRows)
+ assertEquals(1, json.NumberLoadedRows)
+ assertEquals(2, json.NumberFilteredRows)
+ }
}
sql "sync"
qt_sql """select * from ${tableName} order by id;"""
sql """ DROP TABLE IF EXISTS ${tableName} """
- def tableName2 = "test_partial_update_upsert2"
+ def tableName2 = "test_partial_update_strict_mode2"
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE ${tableName2} (
@@ -89,6 +99,7 @@ suite("test_partial_update_upsert", "p0") {
set 'partial_columns', 'true'
set 'columns', 'id,balance,last_access_time'
set 'strict_mode', 'true'
+ set 'max_filter_ratio', '0.5'
file 'upsert.csv'
time 10000 // limit inflight 10s
@@ -96,8 +107,13 @@ suite("test_partial_update_upsert", "p0") {
check {result, exception, startTime, endTime ->
assertTrue(exception == null)
def json = parseJson(result)
- assertEquals("fail", json.Status.toLowerCase())
+ assertEquals("Fail", json.Status)
+ assertEquals("[INTERNAL_ERROR]too many filtered rows",
json.Message)
+ assertEquals(3, json.NumberTotalRows)
+ assertEquals(1, json.NumberLoadedRows)
+ assertEquals(2, json.NumberFilteredRows)
}
}
- sql """ DROP TABLE IF EXISTS ${tableName} """
+ qt_sql """select * from ${tableName2} order by id;"""
+ sql """ DROP TABLE IF EXISTS ${tableName2} """
}
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
index 4da4159790..34d3c82d72 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_upsert.groovy
@@ -99,5 +99,5 @@ suite("test_partial_update_upsert", "p0") {
assertEquals("fail", json.Status.toLowerCase())
}
}
- sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ DROP TABLE IF EXISTS ${tableName2} """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]