decster commented on a change in pull request #3637:
URL: https://github.com/apache/incubator-doris/pull/3637#discussion_r429258167



##########
File path: be/src/olap/memory/mem_sub_tablet.cpp
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/mem_sub_tablet.h"
+
+#include "olap/memory/column.h"
+#include "olap/memory/column_reader.h"
+#include "olap/memory/column_writer.h"
+#include "olap/memory/hash_index.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+Status MemSubTablet::create(uint64_t version, const Schema& schema,
+                            std::unique_ptr<MemSubTablet>* ret) {
+    std::unique_ptr<MemSubTablet> tmp(new MemSubTablet());
+    tmp->_versions.reserve(64);
+    tmp->_versions.emplace_back(version, 0);
+    tmp->_columns.resize(schema.cid_size());
+    for (size_t i = 0; i < schema.num_columns(); i++) {
+        // TODO: support storage_type != c.type
+        auto& c = *schema.get(i);
+        if (!supported(c.type())) {
+            return Status::NotSupported("column type not supported");
+        }
+        tmp->_columns[c.cid()].reset(new Column(c, c.type(), version));
+    }
+    tmp.swap(*ret);
+    return Status::OK();
+}
+
+MemSubTablet::MemSubTablet() : _index(new HashIndex(1 << 16)) {}
+
+MemSubTablet::~MemSubTablet() {}
+
+Status MemSubTablet::get_size(uint64_t version, size_t* size) const {
+    std::lock_guard<std::mutex> lg(_lock);
+    if (version == static_cast<uint64_t>(-1)) {
+        // get latest
+        *size = _versions.back().size;
+        return Status::OK();
+    }
+    if (_versions[0].version > version) {
+        return Status::NotFound("get_size failed, version too old");
+    }
+    for (size_t i = 1; i < _versions.size(); i++) {
+        if (_versions[i].version > version) {
+            *size = _versions[i - 1].size;
+            return Status::OK();
+        }
+    }
+    *size = _versions.back().size;
+    return Status::OK();
+}
+
+Status MemSubTablet::read_column(uint64_t version, uint32_t cid,
+                                 std::unique_ptr<ColumnReader>* reader) {
+    scoped_refptr<Column> cl;
+    {
+        std::lock_guard<std::mutex> lg(_lock);
+        if (cid < _columns.size()) {
+            cl = _columns[cid];
+        }
+    }
+    if (!cl) {
+        return Status::NotFound("column not found");
+    }
+    return cl->create_reader(version, reader);
+}
+
+Status MemSubTablet::get_index_to_read(scoped_refptr<HashIndex>* index) {
+    *index = _index;
+    return Status::OK();
+}
+
+Status MemSubTablet::begin_write(scoped_refptr<Schema>* schema) {
+    _schema = *schema;
+    _row_size = latest_size();
+    _write_index = _index;
+    _writers.clear();
+    _writers.resize(_columns.size());
+    // precache key columns
+    for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+        uint32_t cid = _schema->get(i)->cid();
+        if (!_writers[cid]) {
+            RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+        }
+    }
+    _temp_hash_entries.reserve(8);
+
+    // setup stats
+    _write_start = GetMonoTimeSecondsAsDouble();
+    _num_insert = 0;
+    _num_update = 0;
+    _num_update_cell = 0;
+    return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row_batch(PartialRowBatch* batch) {
+    while (true) {
+        bool has_row = false;
+        RETURN_IF_ERROR(batch->next_row(&has_row));
+        if (!has_row) {
+            break;
+        }
+        RETURN_IF_ERROR(apply_partial_row(*batch));
+    }
+    return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row(const PartialRowBatch& row) {
+    DCHECK_GE(row.cur_row_cell_size(), 1);
+    const ColumnSchema* dsc;
+    const void* key;
+    // get key column and find in hash index
+    // TODO: support multi-column row key
+    row.cur_row_get_cell(0, &dsc, &key);
+    ColumnWriter* keyw = _writers[1].get();
+    // find candidate rowids, and check equality
+    uint64_t hashcode = keyw->hashcode(key, 0);
+    _temp_hash_entries.clear();
+    uint32_t newslot = _write_index->find(hashcode, &_temp_hash_entries);
+    uint32_t rid = -1;
+    for (size_t i = 0; i < _temp_hash_entries.size(); i++) {
+        uint32_t test_rid = _temp_hash_entries[i];
+        if (keyw->equals(test_rid, key, 0)) {
+            rid = test_rid;
+            break;
+        }
+    }
+    // if rowkey not found, do insertion/append
+    if (rid == -1) {
+        _num_insert++;

Review comment:
       fixed

##########
File path: be/src/olap/memory/partial_row_batch.h
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/memory/common.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+// A chunk of memory that stores a batch of serialized partial rows
+// User can iterate through all the partial rows, get each partial row's cells.
+//
+// Serialization format for a batch:
+// 4 byte len | serialized partial row
+// 4 byte len | serialized partial row
+// ...
+// 4 byte len | serialized partial row
+//
+// Serialization format for a partial row
+// bit vector(se + null) byte size (2 byte) |
+// bit vector mark set cells |
+// bit vector mark nullable cells' null value |
+// 8bit padding
+// serialized not null cells
+//
+// Example usage:
+// PartialRowBatch rb(&schema);
+// rb.load(buffer);
+// while (true) {
+//     bool has;
+//     rb.next(&has);
+//     if (!has) break;
+//     for (size_t j=0; j < reader.cell_size(); j++) {
+//         const ColumnSchema* cs = nullptr;
+//         const void* data = nullptr;
+//         // get column cell type and data
+//         rb.get_cell(j, &cs, &data);
+//     }
+// }
+//
+// Note: currently only fixed length column types are supported. All length 
and scalar types store
+// in native byte order(little endian in x86-64).
+//
+// Note: The serialization format is simple, it only provides basic 
functionalities
+// so we can quickly complete the whole create/read/write pipeline. The format 
may change
+// as the project evolves.
+class PartialRowBatch {
+public:
+    explicit PartialRowBatch(scoped_refptr<Schema>* schema);
+    ~PartialRowBatch();
+
+    const Schema& schema() const { return *_schema.get(); }
+
+    // Load from a serialized buffer
+    Status load(std::vector<uint8_t>&& buffer);
+
+    // Return row count in this batch
+    size_t row_size() const { return _row_size; }
+
+    // Iterate to next row, mark has_row to false if there is no more rows
+    Status next_row(bool* has_row);
+
+    // Get row operation cell count
+    size_t cur_row_cell_size() const { return _cells.size(); }
+    // Get row operation cell by index idx, return ColumnSchema and data 
pointer
+    Status cur_row_get_cell(size_t idx, const ColumnSchema** cs, const void** 
data) const;
+
+private:
+    scoped_refptr<Schema> _schema;
+
+    bool _delete = false;
+    size_t _bit_set_size = 0;
+    struct CellInfo {
+        CellInfo(uint32_t cid, const void* data)
+                : cid(cid), data(reinterpret_cast<const uint8_t*>(data)) {}
+        uint32_t cid = 0;
+        const uint8_t* data = nullptr;
+    };
+    vector<CellInfo> _cells;
+
+    size_t _next_row = 0;
+    size_t _row_size = 0;
+    const uint8_t* _pos = nullptr;
+    std::vector<uint8_t> _buffer;
+};
+
+// Writer for PartialRowBatch
+//
+// Example usage:
+// scoped_refptr<Schema> sc;
+// Schema::create("id int,uv int,pv int,city tinyint null", &sc);
+// PartialRowWriter writer(*sc.get());
+// writer.start_batch();
+// for (auto& row : rows) {
+//     writer.start_row();
+//     writer.set("column_name", value);
+//     ...
+//     writer.set(column_id, value);
+//     writer.end_row();
+// }
+// vector<uint8_t> buffer;
+// writer.end_batch(&buffer);

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to