github-actions[bot] commented on code in PR #64674:
URL: https://github.com/apache/doris/pull/64674#discussion_r3464067083


##########
be/src/storage/transform/row_binlog_derive.cpp:
##########
@@ -0,0 +1,351 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/transform/row_binlog_derive.h"
+
+#include <algorithm>
+
+#include "cloud/config.h"
+#include "common/cast_set.h"
+#include "core/block/block.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_vector.h"
+#include "storage/binlog.h"
+#include "storage/iterator/olap_data_convertor.h"
+#include "storage/key/row_key_encoder.h"
+#include "storage/mow/historical_row_fetcher.h"
+#include "storage/partial_update_info.h"
+#include "storage/rowset/rowset_writer_context.h"
+#include "storage/segment/historical_row_retriever.h"
+#include "storage/tablet/tablet_schema.h"
+#include "storage/transform/transform_util.h"
+#include "util/time.h"
+
+namespace doris::segment_v2 {
+
+// number of binlog prefix columns: LSN, op, timestamp
+static constexpr uint32_t BINLOG_COLNUM = 3;
+
+namespace {
+
+// Runs the primary-key historical lookup over `block`'s source key (+seq)
+// columns, leaving the planned reads and per-row ops in `retriever` for the
+// AFTER/BEFORE assembly. `seq_pos` is the seq column's position in the input
+// block (-1 if absent). `convertor` must outlive `retriever`: its accessors 
back
+// the lookup plan.
+Status setup_retriever_and_lookup(TransformExecContext& ctx, const 
SegmentWriteBinlogOptions& cfg,
+                                  const TabletSchemaSPtr& source_schema, const 
Block* block,
+                                  int32_t seq_pos, const Int8* delete_signs, 
size_t num_rows,
+                                  OlapBlockDataConvertor& convertor,
+                                  
std::unique_ptr<PrimaryKeyModelRowRetriever>& retriever) {
+    retriever = std::make_unique<PrimaryKeyModelRowRetriever>();
+    RETURN_IF_ERROR(retriever->init(HistoricalRowRetrieverContext {
+            .tablet = ctx.tablet,
+            .tablet_schema = source_schema,
+            .rowset_writer_ctx = ctx.rowset_ctx,
+            .partial_update_info = cfg.source.partial_update_info,
+            .is_transient_rowset_writer = 
cfg.source.is_transient_rowset_writer,
+            .write_type = cfg.source.source_write_type}));
+
+    // key (+seq) only conversion from the input block for the lookup
+    convertor.resize(source_schema->num_columns());
+    std::vector<IOlapColumnDataAccessor*> key_columns;
+    RETURN_IF_ERROR(convert_key_columns(convertor, *source_schema, *block, 
num_rows, key_columns));
+    IOlapColumnDataAccessor* seq_column = nullptr;
+    if (seq_pos != -1) {
+        RETURN_IF_ERROR(convert_seq_column(convertor, *source_schema, *block,
+                                           static_cast<size_t>(seq_pos), 
num_rows, seq_column));
+    }
+    
RETURN_IF_ERROR(retriever->prepare_lookup_plan_from_source_columns(key_columns, 
seq_column,
+                                                                       
cfg.source.mow_context));
+    RETURN_IF_ERROR(retriever->retrieve_historical_row(delete_signs, 0, 
num_rows));
+    return Status::OK();
+}
+
+// Fills the BEFORE value columns into `out` from the retriever's historical
+// reads. No-op when the source schema has no value columns.
+Status fill_before_columns(Block& out, const TabletSchema& binlog_schema,
+                           const TabletSchema& source_schema,
+                           PrimaryKeyModelRowRetriever* retriever, uint32_t 
before_col_start,
+                           size_t num_rows) {
+    size_t value_column_num = source_schema.num_visible_value_columns();
+    if (value_column_num == 0) {
+        return Status::OK();
+    }
+    std::vector<uint32_t> before_cids;
+    for (uint32_t cid = before_col_start;
+         cid < before_col_start + cast_set<uint32_t>(value_column_num); ++cid) 
{
+        before_cids.emplace_back(cid);
+    }
+    Block before_block = binlog_schema.create_block_by_cids(before_cids);
+    DCHECK(retriever != nullptr);
+    std::vector<uint32_t> value_cids;
+    uint32_t value_start = cast_set<uint32_t>(source_schema.num_key_columns());
+    uint32_t value_end = 
cast_set<uint32_t>(source_schema.num_visible_columns());
+    for (uint32_t cid = value_start; cid < value_end; ++cid) {
+        value_cids.emplace_back(cid);
+    }
+    DCHECK_EQ(before_cids.size(), value_cids.size());
+    RETURN_IF_ERROR(retriever->build_before_block(&before_block, value_cids, 
0, num_rows));
+    size_t col_pos_in_block = 0;
+    for (auto cid : before_cids) {
+        out.replace_by_position(cid, 
before_block.get_by_position(col_pos_in_block++).column);
+    }
+    return Status::OK();
+}
+
+// Fills the LSN / op / timestamp prefix columns into `out`. The LSN is the
+// per-row auto-inc value (the real commit tso is unknown until publish); op is
+// the only meaningful value; the timestamp is a NULL placeholder the reader 
fills.
+void fill_binlog_prefix_columns(Block& out, const TabletSchema& binlog_schema,
+                                uint32_t binlog_col_start, const 
std::vector<int64_t>& lsn_ids,
+                                const std::vector<int64_t>& operators, size_t 
num_rows) {
+    std::vector<uint32_t> binlog_cids = {binlog_col_start, binlog_col_start + 
1,
+                                         binlog_col_start + 2};
+    Block binlog_prefix_block = 
binlog_schema.create_block_by_cids(binlog_cids);
+    {
+        auto binlog_prefix_columns_guard = 
binlog_prefix_block.mutate_columns_scoped();
+        auto& binlog_prefix_columns = 
binlog_prefix_columns_guard.mutable_columns();
+        // auto-inc lsn keeps row order; the reader splices in the real commit 
tso
+        IColumn* lsn_col_ptr = binlog_prefix_columns[0].get();
+        for (size_t i = 0; i < num_rows; i++) {
+            assert_cast<ColumnInt128*>(lsn_col_ptr)
+                    ->insert_value(static_cast<int128_t>(lsn_ids.at(i)));
+        }
+
+        // wrong op only happens under partial update, it is fixed by the
+        // delete bitmap at publish
+        const FieldType op_col_type = 
binlog_schema.column(binlog_cids[1]).type();
+        IColumn* op_col_ptr = binlog_prefix_columns[1].get();
+        auto* op_nullable_column = 
check_and_get_column<ColumnNullable>(op_col_ptr);
+        IColumn* op_nested_column = op_nullable_column != nullptr
+                                            ? 
&op_nullable_column->get_nested_column()
+                                            : op_col_ptr;
+        CHECK(operators.size() >= num_rows) << operators.size() << " vs " << 
num_rows;
+        CHECK(op_col_type == FieldType::OLAP_FIELD_TYPE_BIGINT)
+                << "row binlog op column type must be BIGINT, actual="
+                << static_cast<int>(op_col_type);
+        auto* op_int64_column = assert_cast<ColumnInt64*>(op_nested_column);
+        for (size_t i = 0; i < num_rows; i++) {
+            op_int64_column->insert_value(operators[i]);
+        }
+
+        // NULL placeholder: the reader replaces it with the real commit_tso
+        // (SegmentIterator::_update_tso_col_if_needed), so the on-disk value 
is unused.
+        IColumn* ts_col_ptr = binlog_prefix_columns[2].get();
+        auto* ts_nullable_column = 
check_and_get_column<ColumnNullable>(ts_col_ptr);
+        if (ts_nullable_column != nullptr) {
+            ts_nullable_column->insert_many_defaults(num_rows);
+        } else {
+            
assert_cast<ColumnInt64*>(ts_col_ptr)->insert_many_defaults(num_rows);
+        }
+
+        // op's null map (the timestamp's is already set by 
insert_many_defaults)
+        for (size_t i = 0; i < num_rows; i++) {
+            if (op_nullable_column != nullptr) {
+                op_nullable_column->get_null_map_data().emplace_back(0);
+            }
+        }
+    }
+    size_t col_pos_in_block = 0;
+    for (auto cid : binlog_cids) {
+        out.replace_by_position(cid,
+                                
binlog_prefix_block.get_by_position(col_pos_in_block++).column);
+    }
+}
+
+// Shared prologue: resolve the binlog schema layout and consume this segment's
+// LSN range (exactly once).
+Status resolve_binlog_context(TransformExecContext& ctx, const Block* block,
+                              BinlogDeriveContext* c) {
+    if (config::is_cloud_mode()) {
+        return Status::NotSupported("append binlog");
+    }
+    c->binlog_schema = ctx.tablet_schema;
+    auto& cfg = const_cast<SegmentWriteBinlogOptions&>(
+            ctx.rowset_ctx->write_binlog_opt().write_binlog_config());
+    c->source_schema = cfg.source.tablet_schema;
+    if (UNLIKELY(c->source_schema == nullptr)) {
+        return Status::InternalError("binlog<row> derive missing 
source_tablet_schema");
+    }
+    c->num_rows = block->rows();
+
+    // the per-row LSN range was registered per segment id before this flush;
+    // consume it exactly once.
+    if (ctx.segment_id < 0) {
+        return Status::InternalError<false>(
+                "binlog<row> blocks must be flushed through 
flush_single_block");
+    }
+    c->lsn_ids = cfg.get_seg_lsn(ctx.segment_id);
+    cfg.remove_seg(ctx.segment_id);
+    CHECK(c->lsn_ids->size() >= c->num_rows) << c->lsn_ids->size() << " vs " 
<< c->num_rows;
+
+    // layout bookkeeping, derived from the LSN column position
+    int lsn_col_id = c->binlog_schema->binlog_lsn_col_idx();
+    CHECK(lsn_col_id >= 0) << "binlog<row> schema missing 
__DORIS_BINLOG_LSN__";
+    c->binlog_col_start = cast_set<uint32_t>(lsn_col_id);
+    c->normal_col_start = lsn_col_id == 0 ? BINLOG_COLNUM : 0;
+    c->normal_col_num = 
cast_set<uint32_t>(c->source_schema->num_visible_columns());
+    c->before_col_start = c->normal_col_start + c->normal_col_num;
+    c->write_before = cfg.write_before;
+    // No BEFORE image => the binlog schema must have no BEFORE columns, or 
`out`
+    // would carry unfilled BEFORE columns and row counts wouldn't match the 
swap.
+    DCHECK(c->write_before || c->binlog_schema->num_columns() == 
c->normal_col_num + BINLOG_COLNUM)

Review Comment:
   The old `RowBinlogSegmentWriter` had a compatibility path for row-binlog 
schemas that still contained `__BEFORE__*` columns while `write_before` was 
false: it set `_fill_empty_before_value = true`, forced `_write_before`, and 
`_fill_before_columns()` inserted nullable defaults for each row. This refactor 
now only has a `DCHECK` for that schema shape and `emit_binlog_block()` fills 
BEFORE columns only when `c.write_before` is true. In release builds, a 
mixed/old metadata row-binlog schema with extra BEFORE columns will therefore 
swap out a block where those columns have zero rows while the LSN/AFTER columns 
have `num_rows`, causing the write to fail or produce an invalid segment. 
Please restore the compatibility fill (or return a non-OK status in release 
builds if that shape is intentionally unsupported) and cover the 
`write_before=false` + BEFORE-column schema case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to