This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new b1fdbb3ab51 branch-4.1: [fix](delta writer) Fix shared delta writer 
state lifetime (#64504)
b1fdbb3ab51 is described below

commit b1fdbb3ab5146764045fcfddf33d2059b3d2cbbc
Author: bobhan1 <[email protected]>
AuthorDate: Tue Jun 16 13:27:21 2026 +0800

    branch-4.1: [fix](delta writer) Fix shared delta writer state lifetime 
(#64504)
    
    ### What problem does this PR solve?
    
    Backport #64349 to branch-4.1.
    
    Shared `DeltaWriterV2` instances can be reused by multiple local sinks
    from the same load. Before this change, the shared writer stored the
    `RuntimeState*` from the sink that first created it. If that creator
    sink finished and its `RuntimeState` was destroyed while another local
    sink continued to reuse the shared writer, `DeltaWriterV2::write()`
    could access the destroyed state in the memtable flush-limit
    cancellation path, causing a BE crash or ASAN use-after-free.
    
    This PR removes the stored `RuntimeState*` from `DeltaWriterV2`. The
    shared writer keeps only the stable `WorkloadGroup` pointer needed by
    `MemTableWriter` initialization, and `VTabletWriterV2` passes a per-call
    cancel checker into `DeltaWriterV2::write()` so cancellation is
    evaluated against the current sink.
    
    Branch-4.1 adaptation:
    
    - keep the detailed delta-writer profile gate at the `VTabletWriterV2`
    call site before passing a profile into the shared writer;
    - construct the new BE test schema with branch-local test helpers and
    protobuf fields.
    
    ### Check List
    
    - [x] `git diff HEAD^ HEAD --check`
    - [x] `./run-be-ut.sh --run
    
--filter=TestVTabletWriterV2.shared_delta_writer_should_not_access_destroyed_creator_runtime_state:DeltaWriterV2PoolTest.*
    -j100`
    
    ### Release note
    
    Fix a possible BE crash when shared delta writers are reused by multiple
    local sinks.
---
 be/src/exec/sink/writer/vtablet_writer_v2.cpp      |  17 +-
 be/src/load/delta_writer/delta_writer_v2.cpp       |  29 +--
 be/src/load/delta_writer/delta_writer_v2.h         |  10 +-
 be/test/exec/sink/vtablet_writer_v2_test.cpp       | 231 +++++++++++++++++++++
 .../delta_writer/delta_writer_v2_pool_test.cpp     |  13 +-
 5 files changed, 268 insertions(+), 32 deletions(-)

diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
index 2494f238a4f..f85ba027fe4 100644
--- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
@@ -578,7 +578,11 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t ta
                          << " not found in schema, load_id=" << 
print_id(_load_id);
             return std::unique_ptr<DeltaWriterV2>(nullptr);
         }
-        return DeltaWriterV2::create_unique(&req, streams, _state);
+        std::shared_ptr<WorkloadGroup> workload_group = nullptr;
+        if (_state->get_query_ctx()) {
+            workload_group = _state->workload_group();
+        }
+        return DeltaWriterV2::create_unique(&req, streams, workload_group);
     });
     if (delta_writer == nullptr) {
         LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
@@ -594,7 +598,12 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t ta
         }
     }
     SCOPED_TIMER(_write_memtable_timer);
-    st = delta_writer->write(block.get(), rows.row_idxes);
+    st = delta_writer->write(block.get(), rows.row_idxes, [state = _state]() {
+        if (state->is_cancelled()) {
+            return state->cancel_reason();
+        }
+        return Status::OK();
+    });
     return st;
 }
 
@@ -677,7 +686,9 @@ Status VTabletWriterV2::close(Status exec_status) {
             std::unordered_map<int64_t, int32_t> segments_for_tablet;
             SCOPED_TIMER(_close_writer_timer);
             // close all delta writers if this is the last user
-            auto st = _delta_writer_for_tablet->close(segments_for_tablet, 
_operator_profile);
+            RuntimeProfile* delta_writer_profile =
+                    _state->profile_level() >= 2 ? _operator_profile : nullptr;
+            auto st = _delta_writer_for_tablet->close(segments_for_tablet, 
delta_writer_profile);
             _delta_writer_for_tablet.reset();
             if (!st.ok()) {
                 _cancel(st);
diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp 
b/be/src/load/delta_writer/delta_writer_v2.cpp
index 7ffef693c58..5e7f91db103 100644
--- a/be/src/load/delta_writer/delta_writer_v2.cpp
+++ b/be/src/load/delta_writer/delta_writer_v2.cpp
@@ -66,9 +66,9 @@ using namespace ErrorCode;
 
 DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
                              const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
-                             RuntimeState* state)
-        : _state(state),
-          _req(*req),
+                             std::shared_ptr<WorkloadGroup> workload_group)
+        : _req(*req),
+          _workload_group(std::move(workload_group)),
           _tablet_schema(new TabletSchema),
           _memtable_writer(new MemTableWriter(*req)),
           _streams(streams) {}
@@ -127,19 +127,17 @@ Status DeltaWriterV2::init() {
 
     _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
     RETURN_IF_ERROR(_rowset_writer->init(context));
-    std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
-    if (_state->get_query_ctx()) {
-        wg_sptr = _state->get_query_ctx()->workload_group();
-    }
     RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, 
_partial_update_info,
-                                           wg_sptr, 
_streams[0]->enable_unique_mow(_req.index_id)));
+                                           _workload_group,
+                                           
_streams[0]->enable_unique_mow(_req.index_id)));
     
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
     _is_init = true;
     _streams.clear();
     return Status::OK();
 }
 
-Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& 
row_idxs) {
+Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& 
row_idxs,
+                            const std::function<Status()>& cancel_check) {
     if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
@@ -155,9 +153,8 @@ Status DeltaWriterV2::write(const Block* block, const 
DorisVector<uint32_t>& row
         DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
                         { 
std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
         while (_memtable_writer->flush_running_count() >= 
memtable_flush_running_count_limit) {
-            if (_state->is_cancelled()) {
-                return _state->cancel_reason();
-            }
+            DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait", 
DBUG_RUN_CALLBACK());
+            RETURN_IF_ERROR(cancel_check());
             std::this_thread::sleep_for(std::chrono::milliseconds(10));
         }
     }
@@ -186,14 +183,10 @@ Status DeltaWriterV2::close_wait(int32_t& num_segments, 
RuntimeProfile* profile)
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() 
being called";
 
-    if (_state->profile_level() >= 2 && profile != nullptr) {
+    if (profile != nullptr) {
         _update_profile(profile);
     }
-    if (_state->profile_level() >= 2) {
-        RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
-    } else {
-        RETURN_IF_ERROR(_memtable_writer->close_wait());
-    }
+    RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
     num_segments = _rowset_writer->next_segment_id();
 
     _delta_written_success = true;
diff --git a/be/src/load/delta_writer/delta_writer_v2.h 
b/be/src/load/delta_writer/delta_writer_v2.h
index d637c6afcbc..bfdf0fd3a25 100644
--- a/be/src/load/delta_writer/delta_writer_v2.h
+++ b/be/src/load/delta_writer/delta_writer_v2.h
@@ -24,6 +24,7 @@
 #include <stdint.h>
 
 #include <atomic>
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <shared_mutex>
@@ -52,6 +53,7 @@ class SlotDescriptor;
 class OlapTableSchemaParam;
 class BetaRowsetWriterV2;
 class LoadStreamStub;
+class WorkloadGroup;
 
 class Block;
 
@@ -62,13 +64,14 @@ class DeltaWriterV2 {
 
 public:
     DeltaWriterV2(WriteRequest* req, const 
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
-                  RuntimeState* state);
+                  std::shared_ptr<WorkloadGroup> workload_group);
 
     ~DeltaWriterV2();
 
     Status init();
 
-    Status write(const Block* block, const DorisVector<uint32_t>& row_idxs);
+    Status write(const Block* block, const DorisVector<uint32_t>& row_idxs,
+                 const std::function<Status()>& cancel_check);
 
     // flush the last memtable to flush queue, must call it before close_wait()
     Status close();
@@ -88,11 +91,10 @@ private:
 
     void _update_profile(RuntimeProfile* profile);
 
-    RuntimeState* _state = nullptr;
-
     bool _is_init = false;
     bool _is_cancelled = false;
     WriteRequest _req;
+    std::shared_ptr<WorkloadGroup> _workload_group;
     std::shared_ptr<BetaRowsetWriterV2> _rowset_writer;
     TabletSchemaSPtr _tablet_schema;
     bool _delta_written_success = false;
diff --git a/be/test/exec/sink/vtablet_writer_v2_test.cpp 
b/be/test/exec/sink/vtablet_writer_v2_test.cpp
index d62885d1395..2ddde5bc4b3 100644
--- a/be/test/exec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/exec/sink/vtablet_writer_v2_test.cpp
@@ -18,9 +18,28 @@
 #include "exec/sink/writer/vtablet_writer_v2.h"
 
 #include <gtest/gtest.h>
+#include <unistd.h>
 
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "core/data_type/data_type_number.h"
+#include "exec/operator/operator_helper.h"
+#include "exec/sink/delta_writer_v2_pool.h"
 #include "exec/sink/load_stream_map_pool.h"
 #include "exec/sink/load_stream_stub.h"
+#include "exec/sink/sink_test_utils.h"
+#include "io/fs/local_file_system.h"
+#include "load/memtable/memtable_memory_limiter.h"
+#include "runtime/exec_env.h"
+#include "storage/storage_engine.h"
+#include "storage/tablet/tablet_schema.h"
+#include "testutil/column_helper.h"
+#include "testutil/creators.h"
+#include "util/debug_points.h"
+#include "util/defer_op.h"
 
 namespace doris {
 
@@ -63,6 +82,218 @@ static std::unique_ptr<VTabletWriterV2> 
create_vtablet_writer(int num_replicas =
     return writer;
 }
 
+static TColumn create_int_column_desc(bool is_nullable) {
+    TColumn column;
+    column.__set_column_name("c1");
+    column.__set_column_type(TColumnType());
+    column.column_type.__set_type(TPrimitiveType::INT);
+    column.__set_is_key(true);
+    column.__set_aggregation_type(TAggregationType::NONE);
+    column.__set_is_allow_null(is_nullable);
+    column.__set_col_unique_id(1);
+    return column;
+}
+
+static void prepare_load_runtime_state(MockRuntimeState& state, int sender_id) 
{
+    state.set_backend_id(1);
+    state.set_per_fragment_instance_idx(sender_id);
+    state.set_num_per_fragment_instances(2);
+    state.set_load_stream_per_node(1);
+    state.set_total_load_streams(2);
+    state.set_num_local_sink(2);
+}
+
+static void prepare_open_streams(std::shared_ptr<LoadStreamMap> 
load_stream_map, int64_t node_id,
+                                 int64_t index_id, const TabletSchemaSPtr& 
tablet_schema) {
+    auto streams = load_stream_map->get_or_create(node_id);
+    streams->mark_open();
+    for (auto& stream : streams->streams()) {
+        stream->_is_open.store(true);
+        stream->_status = Status::OK();
+        stream->_tablet_schema_for_index->emplace(index_id, tablet_schema);
+        stream->_enable_unique_mow_for_index->emplace(index_id, false);
+    }
+}
+
+static TabletSchemaSPtr create_int_tablet_schema() {
+    TabletSchemaPB tablet_schema_pb;
+    tablet_schema_pb.set_keys_type(DUP_KEYS);
+    tablet_schema_pb.set_num_short_key_columns(1);
+    tablet_schema_pb.set_num_rows_per_row_block(1024);
+    tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+    tablet_schema_pb.set_next_column_unique_id(2);
+    auto* column = tablet_schema_pb.add_column();
+    column->set_unique_id(1);
+    column->set_name("c1");
+    column->set_type("INT");
+    column->set_is_key(true);
+    column->set_length(4);
+    column->set_index_length(4);
+    column->set_is_nullable(true);
+    column->set_is_bf_column(false);
+
+    auto tablet_schema = std::make_shared<TabletSchema>();
+    tablet_schema->init_from_pb(tablet_schema_pb);
+    return tablet_schema;
+}
+
+static TDataSink create_vtablet_writer_sink(const TOlapTableSchemaParam& 
schema,
+                                            const TOlapTablePartitionParam& 
partition,
+                                            const TOlapTableLocationParam& 
location,
+                                            TTupleId tuple_id, const 
TUniqueId& load_id) {
+    TDataSink t_sink;
+    t_sink.__isset.olap_table_sink = true;
+    auto& olap_sink = t_sink.olap_table_sink;
+    olap_sink.__set_load_id(load_id);
+    olap_sink.__set_txn_id(1);
+    olap_sink.__set_db_id(schema.db_id);
+    olap_sink.__set_table_id(schema.table_id);
+    olap_sink.__set_tuple_id(tuple_id);
+    olap_sink.__set_num_replicas(1);
+    olap_sink.__set_need_gen_rollup(false);
+    olap_sink.__set_schema(schema);
+    olap_sink.__set_partition(partition);
+    olap_sink.__set_location(location);
+
+    TNodeInfo node;
+    node.__set_id(1);
+    node.__set_option(0);
+    node.__set_host("127.0.0.1");
+    node.__set_async_internal_port(8060);
+    TPaloNodesInfo nodes;
+    nodes.nodes.push_back(node);
+    olap_sink.__set_nodes_info(nodes);
+    return t_sink;
+}
+
+TEST_F(TestVTabletWriterV2, 
shared_delta_writer_should_not_access_destroyed_creator_runtime_state) {
+    const bool old_share_delta_writers = config::share_delta_writers;
+    const int32_t old_flush_running_count_limit = 
config::memtable_flush_running_count_limit;
+    const bool old_enable_debug_points = config::enable_debug_points;
+    config::share_delta_writers = true;
+    Defer restore_configs([&] {
+        config::share_delta_writers = old_share_delta_writers;
+        config::memtable_flush_running_count_limit = 
old_flush_running_count_limit;
+        config::enable_debug_points = old_enable_debug_points;
+        
DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait");
+    });
+
+    ExecEnv* exec_env = ExecEnv::GetInstance();
+    auto old_load_stream_map_pool = std::move(exec_env->_load_stream_map_pool);
+    auto old_delta_writer_v2_pool = std::move(exec_env->_delta_writer_v2_pool);
+    auto old_memtable_memory_limiter = 
std::move(exec_env->_memtable_memory_limiter);
+    auto old_storage_engine = std::move(exec_env->_storage_engine);
+    const std::string old_storage_root_path = config::storage_root_path;
+    char cwd_buffer[1024];
+    ASSERT_NE(nullptr, getcwd(cwd_buffer, sizeof(cwd_buffer)));
+    const std::string test_data_dir =
+            std::string(cwd_buffer) + 
"/vtablet_writer_v2_shared_delta_writer_test";
+    Defer restore_exec_env([&]() mutable {
+        exec_env->_delta_writer_v2_pool.reset();
+        exec_env->_load_stream_map_pool.reset();
+        exec_env->_storage_engine.reset();
+        exec_env->_memtable_memory_limiter.reset();
+        exec_env->_storage_engine = std::move(old_storage_engine);
+        exec_env->_memtable_memory_limiter = 
std::move(old_memtable_memory_limiter);
+        exec_env->_delta_writer_v2_pool = std::move(old_delta_writer_v2_pool);
+        exec_env->_load_stream_map_pool = std::move(old_load_stream_map_pool);
+        config::storage_root_path = old_storage_root_path;
+        
static_cast<void>(io::global_local_filesystem()->delete_directory(test_data_dir));
+    });
+
+    config::storage_root_path = test_data_dir;
+    
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(test_data_dir).ok());
+    
ASSERT_TRUE(io::global_local_filesystem()->create_directory(test_data_dir).ok());
+    EngineOptions options;
+    options.store_paths.emplace_back(test_data_dir, -1);
+    auto engine = std::make_unique<StorageEngine>(options);
+    ASSERT_TRUE(engine->open().ok());
+    exec_env->_storage_engine = std::move(engine);
+    auto memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
+    ASSERT_TRUE(memtable_memory_limiter->init(1024 * 1024 * 1024).ok());
+    exec_env->_memtable_memory_limiter = std::move(memtable_memory_limiter);
+    exec_env->_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
+    exec_env->_delta_writer_v2_pool = std::make_unique<DeltaWriterV2Pool>();
+
+    auto creator_ctx = std::make_unique<OperatorContext>();
+    OperatorContext current_ctx;
+    prepare_load_runtime_state(creator_ctx->state, 0);
+    prepare_load_runtime_state(current_ctx.state, 1);
+
+    TOlapTableSchemaParam schema;
+    TTupleId tuple_id = 0;
+    int64_t index_id = 0;
+    sink_test_utils::build_desc_tbl_and_schema(*creator_ctx, schema, tuple_id, 
index_id, false);
+    sink_test_utils::build_desc_tbl_and_schema(current_ctx, schema, tuple_id, 
index_id, false);
+    schema.indexes[0].__set_columns_desc({create_int_column_desc(false)});
+
+    TUniqueId load_id;
+    load_id.hi = 380;
+    load_id.lo = 1;
+    const auto partition = sink_test_utils::build_partition_param(index_id);
+    const auto location = sink_test_utils::build_location_param();
+    const auto t_sink = create_vtablet_writer_sink(schema, partition, 
location, tuple_id, load_id);
+
+    VExprContextSPtrs output_exprs;
+    auto creator_writer = std::make_unique<VTabletWriterV2>(t_sink, 
output_exprs, nullptr, nullptr);
+    auto current_writer = std::make_unique<VTabletWriterV2>(t_sink, 
output_exprs, nullptr, nullptr);
+    ASSERT_TRUE(creator_writer->_init(&creator_ctx->state, 
&creator_ctx->profile).ok());
+    ASSERT_TRUE(current_writer->_init(&current_ctx.state, 
&current_ctx.profile).ok());
+    ASSERT_EQ(creator_writer->_delta_writer_for_tablet, 
current_writer->_delta_writer_for_tablet);
+
+    const auto tablet_schema = create_int_tablet_schema();
+    prepare_open_streams(creator_writer->_load_stream_map, 1, index_id, 
tablet_schema);
+
+    auto block = 
std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt32>({1}));
+    Rows rows;
+    rows.partition_id = 1;
+    rows.index_id = index_id;
+    rows.row_idxes.push_back(0);
+
+    const auto first_write_status = creator_writer->_write_memtable(block, 
100, rows);
+    ASSERT_TRUE(first_write_status.ok()) << first_write_status;
+    ASSERT_EQ(1, creator_writer->_delta_writer_for_tablet->size());
+
+    // The first write above creates the shared DeltaWriterV2 and stores
+    // creator_ctx->state in DeltaWriterV2::_state. Destroy the creator sink 
and
+    // its RuntimeState to reproduce the original lifetime boundary: another
+    // local sink can still reuse the shared writer after the creator state is
+    // gone. Do not call creator_writer->_cancel() here because it cancels the
+    // shared writer and would hide the dangling RuntimeState path.
+    creator_writer.reset();
+    creator_ctx.reset();
+
+    // Force the current sink into DeltaWriterV2::write()'s flush-limit wait
+    // path, then cancel the current RuntimeState. Fixed code should observe 
the
+    // current sink's cancel state and exit cleanly; current broken code reads
+    // the destroyed creator RuntimeState from the shared DeltaWriterV2 and 
ASAN
+    // reports heap-use-after-free in the child process.
+    auto debug_point = std::make_shared<DebugPoint>();
+    debug_point->execute_limit = 1;
+    debug_point->callback = std::function<void()>(
+            [&] { current_ctx.state.cancel(Status::Cancelled("current state 
cancelled")); });
+    config::enable_debug_points = true;
+    DebugPoints::instance()->add("DeltaWriterV2.write.flush_limit_wait", 
debug_point);
+    config::memtable_flush_running_count_limit = 0;
+
+    EXPECT_EXIT(
+            {
+                alarm(10);
+                auto status = current_writer->_write_memtable(block, 100, 
rows);
+                if (!status.ok() &&
+                    status.msg().find("current state cancelled") != 
std::string::npos) {
+                    _exit(0);
+                }
+                _exit(1);
+            },
+            ::testing::ExitedWithCode(0), "");
+
+    config::memtable_flush_running_count_limit = old_flush_running_count_limit;
+    DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait");
+
+    current_writer->_cancel(Status::Cancelled("test cleanup"));
+}
+
 TEST_F(TestVTabletWriterV2, one_replica) {
     UniqueId load_id;
     std::vector<TTabletCommitInfo> tablet_commit_infos;
diff --git a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp 
b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
index 2b6ce8091bf..159bc8b58c9 100644
--- a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
+++ b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
@@ -57,18 +57,17 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
     auto map = pool.get_or_create(load_id);
     EXPECT_EQ(1, pool.size());
     WriteRequest req;
-    RuntimeState state;
-    auto writer = map->get_or_create(100, [&req, &state]() {
+    auto writer = map->get_or_create(100, [&req]() {
         return std::make_unique<DeltaWriterV2>(
-                &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
+                &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, 
nullptr);
     });
-    auto writer2 = map->get_or_create(101, [&req, &state]() {
+    auto writer2 = map->get_or_create(101, [&req]() {
         return std::make_unique<DeltaWriterV2>(
-                &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
+                &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, 
nullptr);
     });
-    auto writer3 = map->get_or_create(100, [&req, &state]() {
+    auto writer3 = map->get_or_create(100, [&req]() {
         return std::make_unique<DeltaWriterV2>(
-                &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
+                &req, std::vector<std::shared_ptr<LoadStreamStub>> {}, 
nullptr);
     });
     EXPECT_EQ(2, map->size());
     EXPECT_EQ(writer, writer3);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to