github-actions[bot] commented on code in PR #26359:
URL: https://github.com/apache/doris/pull/26359#discussion_r1396149295
##########
be/src/olap/wal_manager.cpp:
##########
@@ -76,6 +79,78 @@
&_replay_thread);
}
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ LOG(INFO) << "add wal queue "
+ << ",table_id:" << table_id << ",wal_id:" << wal_id <<
",status:" << wal_status;
+ auto it = _wal_status_queues.find(table_id);
+ if (it == _wal_status_queues.end()) {
+ std::unordered_map<int64_t, WAL_STATUS> tmp;
+ tmp.emplace(wal_id, wal_status);
+ _wal_status_queues.emplace(table_id, tmp);
+ } else {
+ it->second.emplace(wal_id, wal_status);
+ }
+}
+
+Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) {
Review Comment:
warning: method 'erase_wal_status_queue' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:63:
```diff
- Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
+ static Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
```
##########
be/src/olap/wal_reader.h:
##########
@@ -32,11 +32,13 @@ class WalReader {
Status finalize();
Status read_block(PBlock& block);
+ Status read_header(uint32_t& version, std::string& col_ids);
private:
Status _deserialize(PBlock& block, std::string& buf);
Status _check_checksum(const char* binary, size_t size, uint32_t checksum);
+private:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_reader.h:36:** previously declared here
```cpp
private:
^
```
</details>
##########
be/src/vec/sink/writer/vwal_writer.cpp:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vwal_writer.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/compiler_util.h"
+#include "common/status.h"
+#include "olap/wal_manager.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
+#include "util/network_util.h"
+#include "util/proto_util.h"
+#include "util/thrift_util.h"
+#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
+#include "vec/core/future_block.h"
+#include "vec/sink/vtablet_block_convertor.h"
+#include "vec/sink/vtablet_finder.h"
+
+namespace doris {
+namespace vectorized {
Review Comment:
warning: nested namespaces can be concatenated
[modernize-concat-nested-namespaces]
```suggestion
namespace doris::vectorized {
```
be/src/vec/sink/writer/vwal_writer.cpp:118:
```diff
- } // namespace vectorized
- } // namespace doris
+ } // namespace doris
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -76,6 +79,78 @@
&_replay_thread);
}
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ LOG(INFO) << "add wal queue "
+ << ",table_id:" << table_id << ",wal_id:" << wal_id <<
",status:" << wal_status;
+ auto it = _wal_status_queues.find(table_id);
+ if (it == _wal_status_queues.end()) {
+ std::unordered_map<int64_t, WAL_STATUS> tmp;
+ tmp.emplace(wal_id, wal_status);
+ _wal_status_queues.emplace(table_id, tmp);
+ } else {
+ it->second.emplace(wal_id, wal_status);
+ }
+}
+
+Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ auto it = _wal_status_queues.find(table_id);
+ LOG(INFO) << "remove wal queue "
+ << ",table_id:" << table_id << ",wal_id:" << wal_id;
+ if (it == _wal_status_queues.end()) {
+ return Status::InternalError("table_id " + std::to_string(table_id) +
+ " not found in wal status queue");
+ } else {
+ it->second.erase(wal_id);
+ if (it->second.empty()) {
+ _wal_status_queues.erase(table_id);
+ }
+ }
+ return Status::OK();
+}
+
+Status WalManager::get_wal_status_queue_size(const PGetWalQueueSizeRequest*
request,
+ PGetWalQueueSizeResponse*
response) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ size_t count = 0;
+ auto table_id = request->table_id();
+ auto txn_id = request->txn_id();
+ if (table_id > 0 && txn_id > 0) {
+ auto it = _wal_status_queues.find(table_id);
+ if (it == _wal_status_queues.end()) {
+ LOG(INFO) << ("table_id " + std::to_string(table_id) +
+ " not found in wal status queue");
+ } else {
+ for (auto wal_it = it->second.begin(); wal_it != it->second.end();
++wal_it) {
+ if (wal_it->first <= txn_id) {
+ count += 1;
+ }
+ }
+ }
+ } else {
+ for (auto it = _wal_status_queues.begin(); it !=
_wal_status_queues.end(); it++) {
+ count += it->second.size();
+ }
+ }
+ response->set_size(count);
+ if (count > 0) {
+ print_wal_status_queue();
+ }
+ return Status::OK();
+}
+
+void WalManager::print_wal_status_queue() {
Review Comment:
warning: method 'print_wal_status_queue' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:64:
```diff
- void print_wal_status_queue();
+ static void print_wal_status_queue();
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -76,6 +79,78 @@ Status WalManager::init() {
&_replay_thread);
}
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
Review Comment:
warning: method 'add_wal_status_queue' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:62:
```diff
- void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS
wal_status);
+ static void add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status);
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -276,4 +359,37 @@
return Status::OK();
}
+bool WalManager::is_running() {
+ return !_stop.load();
+}
+
+void WalManager::stop_relay_wal() {
Review Comment:
warning: method 'stop_relay_wal' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:67:
```diff
- void stop_relay_wal();
+ static void stop_relay_wal();
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -276,4 +359,37 @@
return Status::OK();
}
+bool WalManager::is_running() {
+ return !_stop.load();
+}
+
+void WalManager::stop_relay_wal() {
+ std::lock_guard<std::shared_mutex> wrlock(_lock);
+ for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
+ it->second->stop();
+ }
+}
+
+void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index) {
+ _wal_column_id_map.emplace(wal_id, column_index);
+}
+
+void WalManager::erase_wal_column_index(int64_t wal_id) {
Review Comment:
warning: method 'erase_wal_column_index' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:69:
```diff
- void erase_wal_column_index(int64_t wal_id);
+ static void erase_wal_column_index(int64_t wal_id);
```
##########
be/src/olap/wal_manager.h:
##########
@@ -32,6 +34,13 @@ namespace doris {
class WalManager {
ENABLE_FACTORY_CREATOR(WalManager);
+public:
+ enum WAL_STATUS {
+ PREPARE = 0,
+ REPLAY,
+ CREATE,
+ };
+
public:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_manager.h:36:** previously declared here
```cpp
public:
^
```
</details>
##########
be/src/olap/wal_table.h:
##########
@@ -31,17 +31,25 @@ class WalTable {
public:
WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id);
~WalTable();
- // <retry_num, start_time_ms, is_doing_replay>
- using replay_wal_info = std::tuple<int64_t, int64_t, bool>;
// used when be start and there are wals need to do recovery
void add_wals(std::vector<std::string> wals);
Status replay_wals();
size_t size();
+ void stop();
+
+public:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_table.h:30:** previously declared here
```cpp
public:
^
```
</details>
##########
be/src/olap/wal_writer.cpp:
##########
@@ -62,31 +61,60 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
for (const auto& block : blocks) {
total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
}
- std::string binary(total_size, '\0');
- char* row_binary = binary.data();
size_t offset = 0;
for (const auto& block : blocks) {
- unsigned long row_length = block->GetCachedSize();
- memcpy(row_binary + offset, &row_length, LENGTH_SIZE);
+ uint8_t len_buf[sizeof(uint64_t)];
+ uint64_t block_length = block->ByteSizeLong();
+ encode_fixed64_le(len_buf, block_length);
+ RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)}));
offset += LENGTH_SIZE;
- memcpy(row_binary + offset, block->SerializeAsString().data(),
row_length);
- offset += row_length;
- uint32_t checksum = crc32c::Value(block->SerializeAsString().data(),
row_length);
- memcpy(row_binary + offset, &checksum, CHECKSUM_SIZE);
+ std::string content = block->SerializeAsString();
+ RETURN_IF_ERROR(_file_writer->append(content));
+ offset += block_length;
+ uint8_t checksum_buf[sizeof(uint32_t)];
+ uint32_t checksum = crc32c::Value(content.data(), block_length);
+ encode_fixed32_le(checksum_buf, checksum);
+ RETURN_IF_ERROR(_file_writer->append({checksum_buf,
sizeof(uint32_t)}));
offset += CHECKSUM_SIZE;
}
- DCHECK(offset == total_size);
+ if (offset != total_size) {
+ return Status::InternalError(
+ "failed to write block to wal expected= " +
std::to_string(total_size) +
+ ",actually=" + std::to_string(offset));
+ }
_disk_bytes.store(_disk_bytes.fetch_add(total_size,
std::memory_order_relaxed),
std::memory_order_relaxed);
_all_wal_disk_bytes->store(
_all_wal_disk_bytes->fetch_add(total_size,
std::memory_order_relaxed),
std::memory_order_relaxed);
- // write rows
- RETURN_IF_ERROR(_file_writer->append({row_binary, offset}));
- _count++;
- if (_count % _batch == 0) {
- //todo sync data
- //LOG(INFO) << "count=" << count << ",do sync";
+ return Status::OK();
+}
+
+Status WalWriter::append_header(uint32_t version, std::string col_ids) {
Review Comment:
warning: method 'append_header' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_writer.h:44:
```diff
- Status append_header(uint32_t version, std::string col_ids);
+ static Status append_header(uint32_t version, std::string col_ids);
```
##########
be/src/vec/sink/writer/vwal_writer.cpp:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vwal_writer.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/compiler_util.h"
+#include "common/status.h"
+#include "olap/wal_manager.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
+#include "util/network_util.h"
+#include "util/proto_util.h"
+#include "util/thrift_util.h"
+#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
+#include "vec/core/future_block.h"
+#include "vec/sink/vtablet_block_convertor.h"
+#include "vec/sink/vtablet_finder.h"
+
+namespace doris {
+namespace vectorized {
+
+VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
RuntimeState* state,
+ TupleDescriptor* output_tuple_desc)
+ : _db_id(db_id),
+ _tb_id(tb_id),
+ _wal_id(wal_id),
+ _state(state),
+ _output_tuple_desc(output_tuple_desc) {}
+
+VWalWriter::~VWalWriter() {}
+
+Status VWalWriter::init() {
+ RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id,
_tb_id, _wal_id,
+
_state->import_label()));
+ RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id,
_wal_writer));
+ _state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id,
+
WalManager::WAL_STATUS::CREATE);
+ std::stringstream ss;
+ for (auto slot_desc : _output_tuple_desc->slots()) {
+ ss << std::to_string(slot_desc->col_unique_id()) << ",";
+ }
+ std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
+ RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids));
+ return Status::OK();
+}
+Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor,
+ OlapTabletFinder* tablet_finder,
vectorized::Block* block,
+ RuntimeState* state, int64_t num_rows, int64_t
filtered_rows) {
+ PBlock pblock;
+ size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ if (filtered_rows == 0) {
+ RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock,
&uncompressed_bytes,
+ &compressed_bytes,
segment_v2::CompressionTypePB::SNAPPY));
+ RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*>
{&pblock}));
+ } else {
+ auto cloneBlock = block->clone_without_columns();
+ auto res_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+ for (int i = 0; i < num_rows; ++i) {
+ if (block_convertor->num_filtered_rows() > 0 &&
block_convertor->filter_map()[i]) {
+ continue;
+ }
+ if (tablet_finder->num_filtered_rows() > 0 &&
tablet_finder->filter_bitmap().Get(i)) {
+ continue;
+ }
+ res_block.add_row(block, i);
+ }
+
RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(),
&pblock,
+ &uncompressed_bytes,
&compressed_bytes,
+
segment_v2::CompressionTypePB::SNAPPY));
+ RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*>
{&pblock}));
+ }
+ return Status::OK();
+}
+Status VWalWriter::append_block(vectorized::Block* input_block, int64_t
num_rows,
+ int64_t filter_rows, vectorized::Block* block,
+ OlapTableBlockConvertor* block_convertor,
+ OlapTabletFinder* tablet_finder) {
+ RETURN_IF_ERROR(
+ write_wal(block_convertor, tablet_finder, block, _state, num_rows,
filter_rows));
+#ifndef BE_TEST
+ auto* future_block = assert_cast<FutureBlock*>(input_block);
+ std::unique_lock<doris::Mutex> l(*(future_block->lock));
+ future_block->set_result(Status::OK(), num_rows, num_rows - filter_rows);
+ future_block->cv->notify_all();
+#endif
+ return Status::OK();
+}
+Status VWalWriter::close() {
Review Comment:
warning: method 'close' can be made static
[readability-convert-member-functions-to-static]
be/src/vec/sink/writer/vwal_writer.h:94:
```diff
- Status close();
+ static Status close();
```
##########
be/src/olap/wal_writer.h:
##########
@@ -40,11 +42,15 @@ class WalWriter {
Status append_blocks(const PBlockArray& blocks);
size_t disk_bytes() const { return
_disk_bytes.load(std::memory_order_relaxed); };
+ Status append_header(uint32_t version, std::string col_ids);
std::string file_name() { return _file_name; };
+
+public:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_writer.h:34:** previously declared here
```cpp
public:
^
```
</details>
##########
be/src/vec/sink/writer/vwal_writer.h:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <brpc/controller.h>
+#include <bthread/types.h>
+#include <butil/errno.h>
+#include <fmt/format.h>
+#include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/FrontendService.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/callback.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <chrono>
+#include <cstdint>
+#include <functional>
+#include <initializer_list>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <queue>
+#include <set>
+#include <sstream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/tablet_info.h"
+#include "gutil/ref_counted.h"
+#include "olap/wal_writer.h"
+#include "runtime/decimalv2_value.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "runtime/thread_context.h"
+#include "runtime/types.h"
+#include "util/countdown_latch.h"
+#include "util/ref_count_closure.h"
+#include "util/runtime_profile.h"
+#include "util/spinlock.h"
+#include "util/stopwatch.hpp"
+#include "vec/columns/column.h"
+#include "vec/common/allocator.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
+#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/vtablet_block_convertor.h"
+#include "vec/sink/vtablet_finder.h"
+#include "vec/sink/writer/async_result_writer.h"
+namespace doris {
+namespace vectorized {
Review Comment:
warning: nested namespaces can be concatenated
[modernize-concat-nested-namespaces]
```suggestion
namespace doris::vectorized {
```
be/src/vec/sink/writer/vwal_writer.h:106:
```diff
- } // namespace vectorized
- } // namespace doris
+ } // namespace doris
```
##########
be/src/vec/sink/writer/vwal_writer.cpp:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vwal_writer.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/compiler_util.h"
+#include "common/status.h"
+#include "olap/wal_manager.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
+#include "util/network_util.h"
+#include "util/proto_util.h"
+#include "util/thrift_util.h"
+#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
+#include "vec/core/future_block.h"
+#include "vec/sink/vtablet_block_convertor.h"
+#include "vec/sink/vtablet_finder.h"
+
+namespace doris {
+namespace vectorized {
+
+VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
RuntimeState* state,
+ TupleDescriptor* output_tuple_desc)
+ : _db_id(db_id),
+ _tb_id(tb_id),
+ _wal_id(wal_id),
+ _state(state),
+ _output_tuple_desc(output_tuple_desc) {}
+
+VWalWriter::~VWalWriter() {}
+
+Status VWalWriter::init() {
+ RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id,
_tb_id, _wal_id,
+
_state->import_label()));
+ RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id,
_wal_writer));
+ _state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id,
+
WalManager::WAL_STATUS::CREATE);
+ std::stringstream ss;
+ for (auto slot_desc : _output_tuple_desc->slots()) {
+ ss << std::to_string(slot_desc->col_unique_id()) << ",";
+ }
+ std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
+ RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids));
+ return Status::OK();
+}
+Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor,
Review Comment:
warning: method 'write_wal' can be made static
[readability-convert-member-functions-to-static]
be/src/vec/sink/writer/vwal_writer.h:88:
```diff
- Status write_wal(OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder,
+ static Status write_wal(OlapTableBlockConvertor* block_convertor,
OlapTabletFinder* tablet_finder,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]