This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 217eac790bf [pick](Variant) pick some refactor and fix #34925 #36317
#36201 #36793 (#37526)
217eac790bf is described below
commit 217eac790bfef127d2be80d47b05714a1941cd6b
Author: lihangyu <[email protected]>
AuthorDate: Thu Jul 11 21:25:34 2024 +0800
[pick](Variant) pick some refactor and fix #34925 #36317 #36201 #36793
(#37526)
---
be/src/olap/base_tablet.cpp | 2 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 16 +-
be/src/olap/rowset/rowset.cpp | 16 ++
be/src/olap/rowset/rowset.h | 2 +
be/src/olap/rowset/rowset_meta.cpp | 5 +
be/src/olap/rowset/rowset_writer_context.h | 3 +-
be/src/olap/rowset/segment_creator.cpp | 96 ++------
be/src/olap/rowset/segment_creator.h | 8 +-
.../rowset/segment_v2/hierarchical_data_reader.cpp | 5 +-
.../rowset/segment_v2/hierarchical_data_reader.h | 29 +--
be/src/olap/rowset/segment_v2/segment.cpp | 29 +--
.../rowset/segment_v2/vertical_segment_writer.cpp | 148 ++++++++++++-
.../rowset/segment_v2/vertical_segment_writer.h | 11 +-
be/src/olap/rowset_builder.cpp | 14 +-
be/src/olap/schema_change.cpp | 3 +-
be/src/olap/tablet.cpp | 13 +-
be/src/olap/tablet_schema.cpp | 9 +-
be/src/olap/tablet_schema.h | 2 +-
be/src/vec/columns/column_object.cpp | 19 +-
be/src/vec/columns/column_object.h | 8 -
be/src/vec/common/schema_util.cpp | 246 +++++----------------
be/src/vec/common/schema_util.h | 21 +-
be/src/vec/data_types/data_type.h | 4 +
.../data_types/serde/data_type_object_serde.cpp | 33 ++-
.../vec/data_types/serde/data_type_object_serde.h | 9 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 2 +-
be/src/vec/functions/function_variant_element.cpp | 43 +++-
be/src/vec/olap/olap_data_convertor.cpp | 34 ++-
be/src/vec/olap/olap_data_convertor.h | 15 +-
.../java/org/apache/doris/analysis/UpdateStmt.java | 11 +-
.../trees/plans/commands/UpdateCommand.java | 2 +-
regression-test/data/variant_p0/delete_update.out | 11 +-
.../data/variant_p0/partial_update_parallel1.csv | 5 +
.../data/variant_p0/partial_update_parallel2.csv | 5 +
.../data/variant_p0/partial_update_parallel3.csv | 5 +
.../data/variant_p0/partial_update_parallel4.csv | 3 +
.../data/variant_p0/variant_with_rowstore.out | 9 +
.../variant_github_events_p0_new/load.groovy | 30 +++
.../suites/variant_p0/delete_update.groovy | 124 +++++++++--
.../variant_p0/test_compaction_extract_root.groovy | 12 +-
.../suites/variant_p0/variant_with_rowstore.groovy | 47 +++-
41 files changed, 678 insertions(+), 431 deletions(-)
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 8901aee0e5a..c67c83bbec8 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -79,7 +79,7 @@ Status BaseTablet::update_by_least_common_schema(const
TabletSchemaSPtr& update_
{_max_version_schema, update_schema}, _max_version_schema,
final_schema,
check_column_size));
_max_version_schema = final_schema;
- VLOG_DEBUG << "dump updated tablet schema: " <<
final_schema->dump_structure();
+ VLOG_DEBUG << "dump updated tablet schema: " <<
final_schema->dump_full_schema();
return Status::OK();
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index de051eea45e..fd09f8b0a7e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -600,13 +600,12 @@ Status BaseBetaRowsetWriter::build(RowsetSharedPtr&
rowset) {
}
// update rowset meta tablet schema if tablet schema updated
- if (_context.tablet_schema->num_variant_columns() > 0) {
- _rowset_meta->set_tablet_schema(_context.tablet_schema);
- }
+ auto rowset_schema = _context.merged_tablet_schema != nullptr ?
_context.merged_tablet_schema
+ :
_context.tablet_schema;
+ _rowset_meta->set_tablet_schema(rowset_schema);
RETURN_NOT_OK_STATUS_WITH_WARN(
- RowsetFactory::create_rowset(_context.tablet_schema,
_context.rowset_dir, _rowset_meta,
- &rowset),
+ RowsetFactory::create_rowset(rowset_schema, _context.rowset_dir,
_rowset_meta, &rowset),
"rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
@@ -627,14 +626,17 @@ int64_t BetaRowsetWriter::_num_seg() const {
void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema)
{
std::lock_guard<std::mutex> lock(*(_context.schema_lock));
TabletSchemaSPtr update_schema;
+ if (_context.merged_tablet_schema == nullptr) {
+ _context.merged_tablet_schema = _context.tablet_schema;
+ }
static_cast<void>(vectorized::schema_util::get_least_common_schema(
- {_context.tablet_schema, flush_schema}, nullptr, update_schema));
+ {_context.merged_tablet_schema, flush_schema}, nullptr,
update_schema));
CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
<< "Rowset merge schema columns count is " <<
update_schema->num_columns()
<< ", but flush_schema is larger " << flush_schema->num_columns()
<< " update_schema: " << update_schema->dump_structure()
<< " flush_schema: " << flush_schema->dump_structure();
- _context.tablet_schema.swap(update_schema);
+ _context.merged_tablet_schema.swap(update_schema);
VLOG_DEBUG << "dump rs schema: " <<
_context.tablet_schema->dump_structure();
}
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index b5b68f4d38e..208d05f456c 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -23,6 +23,7 @@
#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
#include "util/time.h"
+#include "vec/common/schema_util.h"
namespace doris {
@@ -107,6 +108,21 @@ void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr&
other) {
for (auto key_bound : key_bounds) {
_rowset_meta->add_segment_key_bounds(key_bound);
}
+
+ // In partial update the rowset schema maybe updated when table contains
variant type, so we need the newest schema to be updated
+ // Otherwise the schema is stale and lead to wrong data read
+ if (tablet_schema()->num_variant_columns() > 0) {
+ // merge extracted columns
+ TabletSchemaSPtr merged_schema;
+ static_cast<void>(vectorized::schema_util::get_least_common_schema(
+ {tablet_schema(), other->tablet_schema()}, nullptr,
merged_schema));
+ if (*_schema != *merged_schema) {
+ _rowset_meta->set_tablet_schema(merged_schema);
+ }
+ // rowset->meta_meta()->tablet_schema() maybe updated so make sure
_schema is
+ // consistent with rowset meta
+ _schema = _rowset_meta->tablet_schema();
+ }
}
void Rowset::clear_cache() {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 87cfe0b0bea..72c6c2fa29b 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -132,6 +132,8 @@ public:
const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }
+ void merge_rowset_meta(const RowsetMeta& other);
+
bool is_pending() const { return _is_pending; }
bool is_local() const { return _rowset_meta->is_local(); }
diff --git a/be/src/olap/rowset/rowset_meta.cpp
b/be/src/olap/rowset/rowset_meta.cpp
index d8ef2e7b5dd..d37f5757064 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -17,6 +17,10 @@
#include "olap/rowset/rowset_meta.h"
+#include <gen_cpp/olap_file.pb.h>
+
+#include <memory>
+
#include "common/logging.h"
#include "google/protobuf/util/message_differencer.h"
#include "io/fs/local_file_system.h"
@@ -28,6 +32,7 @@
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_cache.h"
+#include "vec/common/schema_util.h"
namespace doris {
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 54be9f95970..ad82f6c491e 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -63,7 +63,8 @@ struct RowsetWriterContext {
io::FileSystemSPtr fs;
std::string rowset_dir;
TabletSchemaSPtr tablet_schema;
- TabletSchemaSPtr original_tablet_schema;
+ // for variant schema update
+ TabletSchemaSPtr merged_tablet_schema;
// PREPARED/COMMITTED for pending rowset
// VISIBLE for non-pending rowset
RowsetStatePB rowset_state;
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index b968f684855..e42a80170a8 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -43,6 +43,8 @@
#include "vec/common/schema_util.h" // variant column
#include "vec/core/block.h"
#include "vec/core/columns_with_type_and_name.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
namespace doris {
using namespace ErrorCode;
@@ -61,60 +63,38 @@ Status SegmentFlusher::flush_single_block(const
vectorized::Block* block, int32_
if (block->rows() == 0) {
return Status::OK();
}
- // Expand variant columns
vectorized::Block flush_block(*block);
- TabletSchemaSPtr flush_schema;
if (_context->write_type != DataWriteType::TYPE_COMPACTION &&
_context->tablet_schema->num_variant_columns() > 0) {
- RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block,
flush_schema));
+ RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <=
config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer &&
_context->tablet_schema->cluster_key_idxes().empty()) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
- RETURN_IF_ERROR(_create_segment_writer(writer, segment_id,
no_compression, flush_schema));
+ RETURN_IF_ERROR(_create_segment_writer(writer, segment_id,
no_compression));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0,
flush_block.rows()));
- RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema,
flush_size));
+ RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(),
flush_size));
} else {
std::unique_ptr<segment_v2::SegmentWriter> writer;
- RETURN_IF_ERROR(_create_segment_writer(writer, segment_id,
no_compression, flush_schema));
+ RETURN_IF_ERROR(_create_segment_writer(writer, segment_id,
no_compression));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0,
flush_block.rows()));
- RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema,
flush_size));
+ RETURN_IF_ERROR(_flush_segment_writer(writer, nullptr, flush_size));
}
return Status::OK();
}
-Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
- TabletSchemaSPtr&
flush_schema) {
+Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
}
- {
- std::lock_guard<std::mutex> lock(*(_context->schema_lock));
- // save original tablet schema, _context->tablet_schema maybe modified
- if (_context->original_tablet_schema == nullptr) {
- _context->original_tablet_schema = _context->tablet_schema;
- }
- }
-
std::vector<int> variant_column_pos;
- if (_context->partial_update_info &&
_context->partial_update_info->is_partial_update) {
- // check columns that used to do partial updates should not include
variant
- for (int i : _context->partial_update_info->update_cids) {
- const auto& col = *_context->original_tablet_schema->columns()[i];
- if (!col.is_key() && col.name() != DELETE_SIGN) {
- return Status::InvalidArgument(
- "Not implement partial update for variant only support
delete currently");
- }
- }
- } else {
- // find positions of variant columns
- for (int i = 0; i <
_context->original_tablet_schema->columns().size(); ++i) {
- if
(_context->original_tablet_schema->columns()[i]->is_variant_type()) {
- variant_column_pos.push_back(i);
- }
+ for (int i = 0; i < block.columns(); ++i) {
+ const auto& entry = block.get_by_position(i);
+ if (vectorized::is_variant_type(remove_nullable(entry.type))) {
+ variant_column_pos.push_back(i);
}
}
@@ -123,37 +103,8 @@ Status
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
}
vectorized::schema_util::ParseContext ctx;
- ctx.record_raw_json_column =
_context->original_tablet_schema->store_row_column();
- RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns(
- block, variant_column_pos, ctx));
-
- flush_schema = std::make_shared<TabletSchema>();
- flush_schema->copy_from(*_context->original_tablet_schema);
- vectorized::Block flush_block(std::move(block));
- vectorized::schema_util::rebuild_schema_and_block(
- _context->original_tablet_schema, variant_column_pos, flush_block,
flush_schema);
-
- {
- // Update rowset schema, tablet's tablet schema will be updated when
build Rowset
- // Eg. flush schema: A(int), B(float), C(int), D(int)
- // ctx.tablet_schema: A(bigint), B(double)
- // => update_schema: A(bigint), B(double), C(int), D(int)
- std::lock_guard<std::mutex> lock(*(_context->schema_lock));
- TabletSchemaSPtr update_schema;
- RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
- {_context->tablet_schema, flush_schema}, nullptr,
update_schema));
- CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
- << "Rowset merge schema columns count is " <<
update_schema->num_columns()
- << ", but flush_schema is larger " <<
flush_schema->num_columns()
- << " update_schema: " << update_schema->dump_structure()
- << " flush_schema: " << flush_schema->dump_structure();
- _context->tablet_schema.swap(update_schema);
- VLOG_DEBUG << "dump rs schema: " <<
_context->tablet_schema->dump_structure();
- }
-
- block.swap(flush_block); // NOLINT(bugprone-use-after-move)
- VLOG_DEBUG << "dump block: " << block.dump_data();
- VLOG_DEBUG << "dump flush schema: " << flush_schema->dump_structure();
+ ctx.record_raw_json_column = _context->tablet_schema->store_row_column();
+ RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block,
variant_column_pos, ctx));
return Status::OK();
}
@@ -194,8 +145,7 @@ Status
SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWrit
}
Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
- int32_t segment_id, bool
no_compression,
- TabletSchemaSPtr flush_schema) {
+ int32_t segment_id, bool
no_compression) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id,
file_writer));
@@ -207,10 +157,10 @@ Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
writer_options.compression_type = NO_COMPRESSION;
}
- const auto& tablet_schema = flush_schema ? flush_schema :
_context->tablet_schema;
- writer.reset(new segment_v2::SegmentWriter(
- file_writer.get(), segment_id, tablet_schema, _context->tablet,
_context->data_dir,
- _context->max_rows_per_segment, writer_options,
_context->mow_context));
+ writer.reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id,
+ _context->tablet_schema,
_context->tablet,
+ _context->data_dir,
_context->max_rows_per_segment,
+ writer_options,
_context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.emplace(segment_id, std::move(file_writer));
@@ -226,7 +176,7 @@ Status
SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
Status SegmentFlusher::_create_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t
segment_id,
- bool no_compression, TabletSchemaSPtr flush_schema) {
+ bool no_compression) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id,
file_writer));
@@ -238,10 +188,10 @@ Status SegmentFlusher::_create_segment_writer(
writer_options.compression_type = NO_COMPRESSION;
}
- const auto& tablet_schema = flush_schema ? flush_schema :
_context->tablet_schema;
writer.reset(new segment_v2::VerticalSegmentWriter(
- file_writer.get(), segment_id, tablet_schema, _context->tablet,
_context->data_dir,
- _context->max_rows_per_segment, writer_options,
_context->mow_context));
+ file_writer.get(), segment_id, _context->tablet_schema,
_context->tablet,
+ _context->data_dir, _context->max_rows_per_segment, writer_options,
+ _context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.emplace(segment_id, std::move(file_writer));
diff --git a/be/src/olap/rowset/segment_creator.h
b/be/src/olap/rowset/segment_creator.h
index 214322ed8d5..93508e9629d 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -138,17 +138,15 @@ public:
bool need_buffering();
private:
- Status _expand_variant_to_subcolumns(vectorized::Block& block,
TabletSchemaSPtr& flush_schema);
+ Status _parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>&
segment_writer,
const vectorized::Block* block, size_t row_offset, size_t
row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
segment_writer,
const vectorized::Block* block, size_t row_offset, size_t
row_num);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
- int32_t segment_id, bool no_compression =
false,
- TabletSchemaSPtr flush_schema = nullptr);
+ int32_t segment_id, bool no_compression =
false);
Status
_create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
writer,
- int32_t segment_id, bool no_compression =
false,
- TabletSchemaSPtr flush_schema = nullptr);
+ int32_t segment_id, bool no_compression =
false);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
TabletSchemaSPtr flush_schema = nullptr,
int64_t* flush_size = nullptr);
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index 66ad0eb92a9..dcc082c22ae 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -34,10 +34,9 @@ namespace segment_v2 {
Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
vectorized::PathInData path,
const SubcolumnColumnReaders::Node* node,
- const SubcolumnColumnReaders::Node* root,
- bool output_as_raw_json) {
+ const SubcolumnColumnReaders::Node*
root) {
// None leave node need merge with root
- auto* stream_iter = new HierarchicalDataReader(path, output_as_raw_json);
+ auto* stream_iter = new HierarchicalDataReader(path);
std::vector<const SubcolumnColumnReaders::Node*> leaves;
vectorized::PathsInData leaves_paths;
SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths);
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index 67f78651416..1d02685e445 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -64,12 +64,11 @@ using SubcolumnColumnReaders =
vectorized::SubcolumnsTree<SubcolumnReader>;
// Reader for hierarchical data for variant, merge with root(sparse encoded
columns)
class HierarchicalDataReader : public ColumnIterator {
public:
- HierarchicalDataReader(const vectorized::PathInData& path, bool
output_as_raw_json = false)
- : _path(path), _output_as_raw_json(output_as_raw_json) {}
+ HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {}
static Status create(std::unique_ptr<ColumnIterator>* reader,
vectorized::PathInData path,
const SubcolumnColumnReaders::Node* target_node,
- const SubcolumnColumnReaders::Node* root, bool
output_as_raw_json = false);
+ const SubcolumnColumnReaders::Node* root);
Status init(const ColumnIteratorOptions& opts) override;
@@ -93,7 +92,6 @@ private:
std::unique_ptr<StreamReader> _root_reader;
size_t _rows_read = 0;
vectorized::PathInData _path;
- bool _output_as_raw_json = false;
template <typename NodeFunction>
Status tranverse(NodeFunction&& node_func) {
@@ -154,26 +152,9 @@ private:
return Status::OK();
}));
- if (_output_as_raw_json) {
- auto col_to = vectorized::ColumnString::create();
- col_to->reserve(nrows * 2);
- vectorized::VectorBufferWriter write_buffer(*col_to.get());
- auto type = std::make_shared<vectorized::DataTypeObject>();
- for (size_t i = 0; i < nrows; ++i) {
- type->to_string(container_variant, i, write_buffer);
- write_buffer.commit();
- }
- if (variant.empty()) {
-
variant.create_root(std::make_shared<vectorized::DataTypeString>(),
- std::move(col_to));
- } else {
- variant.get_root()->insert_range_from(*col_to, 0,
col_to->size());
- }
- } else {
- // TODO select v:b -> v.b / v.b.c but v.d maybe in v
- // copy container variant to dst variant, todo avoid copy
- variant.insert_range_from(container_variant, 0, nrows);
- }
+ // TODO select v:b -> v.b / v.b.c but v.d maybe in v
+ // copy container variant to dst variant, todo avoid copy
+ variant.insert_range_from(container_variant, 0, nrows);
// variant.set_num_rows(nrows);
_rows_read += nrows;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index f2ec504c90f..f429cb4afb9 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -536,24 +536,19 @@ Status Segment::new_column_iterator_with_path(const
TabletColumn& tablet_column,
auto sparse_node = tablet_column.has_path_info()
?
_sparse_column_tree.find_exact(*tablet_column.path_info_ptr())
: nullptr;
- if (opt != nullptr && opt->io_ctx.reader_type ==
ReaderType::READER_ALTER_TABLE) {
- CHECK(tablet_column.is_variant_type());
- if (root == nullptr) {
- // No such variant column in this segment, get a default one
- RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
- return Status::OK();
- }
- bool output_as_raw_json = true;
- // Alter table operation should read the whole variant column, since
it does not aware of
- // subcolumns of variant during processing rewriting rowsets.
- // This is slow, since it needs to read all sub columns and merge them
into a single column
- RETURN_IF_ERROR(
- HierarchicalDataReader::create(iter, root_path, root, root,
output_as_raw_json));
- return Status::OK();
- }
- if (opt == nullptr || opt->io_ctx.reader_type != ReaderType::READER_QUERY)
{
- // Could be compaction ..etc and read flat leaves nodes data
+ // Currently only compaction and checksum need to read flat leaves
+ // They both use tablet_schema_with_merged_max_schema_version as read
schema
+ auto type_to_read_flat_leaves = [](ReaderType type) {
+ return type == ReaderType::READER_BASE_COMPACTION ||
+ type == ReaderType::READER_CUMULATIVE_COMPACTION ||
+ type == ReaderType::READER_COLD_DATA_COMPACTION ||
+ type == ReaderType::READER_SEGMENT_COMPACTION ||
+ type == ReaderType::READER_FULL_COMPACTION || type ==
ReaderType::READER_CHECKSUM;
+ };
+
+ if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
+ // compaction need to read flat leaves nodes data to prevent from
amplification
const auto* node = tablet_column.has_path_info()
?
_sub_column_tree.find_leaf(*tablet_column.path_info_ptr())
: nullptr;
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5eadac2abde..143e1b36329 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -140,7 +140,8 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB*
meta, uint32_t colum
}
}
-Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const
TabletColumn& column) {
+Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const
TabletColumn& column,
+ const TabletSchemaSPtr&
tablet_schema) {
ColumnWriterOptions opts;
opts.meta = _footer.add_columns();
@@ -148,9 +149,9 @@ Status
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
// now we create zone map for key columns in AGG_KEYS or all column in
UNIQUE_KEYS or DUP_KEYS
// except for columns whose type don't support zone map.
- opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() !=
KeysType::AGG_KEYS;
+ opts.need_zone_map = column.is_key() || tablet_schema->keys_type() !=
KeysType::AGG_KEYS;
opts.need_bloom_filter = column.is_bf_column();
- auto* tablet_index =
_tablet_schema->get_ngram_bf_index(column.unique_id());
+ auto* tablet_index = tablet_schema->get_ngram_bf_index(column.unique_id());
if (tablet_index) {
opts.need_bloom_filter = true;
opts.is_ngram_bf_index = true;
@@ -166,12 +167,14 @@ Status
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
- _tablet_schema->skip_write_index_on_load()) {
+ tablet_schema->skip_write_index_on_load()) {
skip_inverted_index = true;
}
// indexes for this column
- opts.indexes = _tablet_schema->get_indexes_for_column(column);
+ opts.indexes = tablet_schema->get_indexes_for_column(column);
if (!InvertedIndexColumnWriter::check_support_inverted_index(column)) {
+ // skip inverted index if invalid
+ opts.indexes.clear();
opts.need_zone_map = false;
opts.need_bloom_filter = false;
opts.need_bitmap_index = false;
@@ -302,7 +305,8 @@ void
VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl
// 2.2 build read plan to read by batch
// 2.3 fill block
// 3. set columns to data convertor and then write all columns
-Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock&
data) {
+Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock&
data,
+
vectorized::Block& full_block) {
if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
// TODO(plat1ko): CloudStorageEngine
return Status::NotSupported("append_block_with_partial_content");
@@ -313,7 +317,7 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
auto tablet = static_cast<Tablet*>(_tablet.get());
// create full block and fill with input columns
- auto full_block = _tablet_schema->create_block();
+ full_block = _tablet_schema->create_block();
const auto& including_cids =
_opts.rowset_ctx->partial_update_info->update_cids;
size_t input_id = 0;
for (auto i : including_cids) {
@@ -702,16 +706,127 @@ Status VerticalSegmentWriter::batch_block(const
vectorized::Block* block, size_t
return Status::OK();
}
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous
merged_tablet_schema
+// which will be used to contruct the new schema for rowset
+Status
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data)
{
+ if (_tablet_schema->num_variant_columns() == 0) {
+ return Status::OK();
+ }
+ size_t column_id = _tablet_schema->num_columns();
+ for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+ if (!_tablet_schema->columns()[i]->is_variant_type()) {
+ continue;
+ }
+ if (_flush_schema == nullptr) {
+ _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+ }
+ auto column_ref = data.block->get_by_position(i).column;
+ const vectorized::ColumnObject& object_column =
assert_cast<vectorized::ColumnObject&>(
+ remove_nullable(column_ref)->assume_mutable_ref());
+ const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+ // generate column info by entry info
+ auto generate_column_info = [&](const auto& entry) {
+ const std::string& column_name =
+ parent_column->name_lower_case() + "." +
entry->path.get_path();
+ const vectorized::DataTypePtr& final_data_type_from_object =
+ entry->data.get_least_common_type();
+ vectorized::PathInDataBuilder full_path_builder;
+ auto full_path =
full_path_builder.append(parent_column->name_lower_case(), false)
+ .append(entry->path.get_parts(), false)
+ .build();
+ return vectorized::schema_util::get_column_by_type(
+ final_data_type_from_object, column_name,
+ vectorized::schema_util::ExtraInfo {
+ .unique_id = -1,
+ .parent_unique_id = parent_column->unique_id(),
+ .path_info = full_path});
+ };
+
+ CHECK(object_column.is_finalized());
+ // common extracted columns
+ for (const auto& entry :
+
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns()))
{
+ if (entry->path.empty()) {
+ // already handled by parent column
+ continue;
+ }
+ CHECK(entry->data.is_finalized());
+ int current_column_id = column_id++;
+ TabletColumn tablet_column = generate_column_info(entry);
+ vectorized::schema_util::inherit_column_attributes(*parent_column,
tablet_column,
+ _flush_schema);
+ RETURN_IF_ERROR(_create_column_writer(current_column_id
/*unused*/, tablet_column,
+ _flush_schema));
+
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+ {entry->data.get_finalized_column_ptr()->get_ptr(),
+ entry->data.get_least_common_type(),
tablet_column.name()},
+ data.row_pos, data.num_rows, current_column_id));
+ // convert column data from engine format to storage layer format
+ auto [status, column] =
_olap_data_convertor->convert_column_data(current_column_id);
+ if (!status.ok()) {
+ return status;
+ }
+ RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+ column->get_nullmap(), column->get_data(), data.num_rows));
+ _flush_schema->append_column(tablet_column);
+ _olap_data_convertor->clear_source_content();
+ }
+ // sparse_columns
+ for (const auto& entry :
vectorized::schema_util::get_sorted_subcolumns(
+ object_column.get_sparse_subcolumns())) {
+ TabletColumn sparse_tablet_column = generate_column_info(entry);
+ _flush_schema->mutable_column_by_uid(parent_column->unique_id())
+ .append_sparse_column(sparse_tablet_column);
+
+ // add sparse column to footer
+ auto* column_pb = _footer.mutable_columns(i);
+ _init_column_meta(column_pb->add_sparse_columns(), -1,
sparse_tablet_column);
+ }
+ }
+
+ // Update rowset schema, tablet's tablet schema will be updated when build
Rowset
+ // Eg. flush schema: A(int), B(float), C(int), D(int)
+ // ctx.tablet_schema: A(bigint), B(double)
+ // => update_schema: A(bigint), B(double), C(int), D(int)
+ std::lock_guard<std::mutex> lock(*(_opts.rowset_ctx->schema_lock));
+ if (_opts.rowset_ctx->merged_tablet_schema == nullptr) {
+ _opts.rowset_ctx->merged_tablet_schema =
_opts.rowset_ctx->tablet_schema;
+ }
+ TabletSchemaSPtr update_schema;
+ RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+ {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr,
update_schema));
+ CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns())
+ << "Rowset merge schema columns count is " <<
update_schema->num_columns()
+ << ", but flush_schema is larger " << _flush_schema->num_columns()
+ << " update_schema: " << update_schema->dump_structure()
+ << " flush_schema: " << _flush_schema->dump_structure();
+ _opts.rowset_ctx->merged_tablet_schema.swap(update_schema);
+ VLOG_DEBUG << "dump block " << data.block->dump_data();
+ VLOG_DEBUG << "dump rs schema: " <<
_opts.rowset_ctx->merged_tablet_schema->dump_full_schema();
+ VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : "
<< _segment_id;
+ return Status::OK();
+}
+
Status VerticalSegmentWriter::write_batch() {
if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
- RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid)));
+ RETURN_IF_ERROR(
+ _create_column_writer(cid, _tablet_schema->column(cid),
_tablet_schema));
}
+ vectorized::Block full_block;
for (auto& data : _batched_blocks) {
- RETURN_IF_ERROR(_append_block_with_partial_content(data));
+ RETURN_IF_ERROR(_append_block_with_partial_content(data,
full_block));
+ }
+ for (auto& data : _batched_blocks) {
+ RowsInBlock full_rows_block {&full_block, data.row_pos,
data.num_rows};
+
RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block));
}
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->finish());
@@ -733,7 +848,7 @@ Status VerticalSegmentWriter::write_batch() {
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
- RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid)));
+ RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid), _tablet_schema));
for (auto& data : _batched_blocks) {
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
data.block, data.row_pos, data.num_rows,
std::vector<uint32_t> {cid}));
@@ -806,6 +921,19 @@ Status VerticalSegmentWriter::write_batch() {
_num_rows_written += data.num_rows;
}
+ if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+ _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
+ size_t original_writers_cnt = _column_writers.size();
+ // handle variant dynamic sub columns
+ for (auto& data : _batched_blocks) {
+ RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data));
+ }
+ for (size_t i = original_writers_cnt; i < _column_writers.size(); ++i)
{
+ RETURN_IF_ERROR(_column_writers[i]->finish());
+ RETURN_IF_ERROR(_column_writers[i]->write_data());
+ }
+ }
+
_batched_blocks.clear();
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index ffa5f3807ae..8fd854c3e95 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -117,11 +117,14 @@ public:
Slice min_encoded_key();
Slice max_encoded_key();
+ TabletSchemaSPtr flush_schema() const { return _flush_schema; };
+
void clear();
private:
void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const
TabletColumn& column);
- Status _create_column_writer(uint32_t cid, const TabletColumn& column);
+ Status _create_column_writer(uint32_t cid, const TabletColumn& column,
+ const TabletSchemaSPtr& schema);
size_t _calculate_inverted_index_file_size();
uint64_t _estimated_remaining_size();
Status _write_ordinal_index();
@@ -146,7 +149,8 @@ private:
void _set_min_key(const Slice& key);
void _set_max_key(const Slice& key);
void _serialize_block_to_row_column(vectorized::Block& block);
- Status _append_block_with_partial_content(RowsInBlock& data);
+ Status _append_block_with_partial_content(RowsInBlock& data,
vectorized::Block& full_block);
+ Status _append_block_with_variant_subcolumns(RowsInBlock& data);
Status _fill_missing_columns(vectorized::MutableColumns&
mutable_full_columns,
const std::vector<bool>&
use_default_or_null_flag,
bool has_default_or_nullable, const size_t&
segment_start_pos,
@@ -204,6 +208,9 @@ private:
std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
std::vector<RowsInBlock> _batched_blocks;
+
+ // contains auto generated columns, should be nullptr if no variants's
subcolumns
+ TabletSchemaSPtr _flush_schema = nullptr;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 23232c4d0a5..32a6ba88ce7 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -305,12 +305,22 @@ Status RowsetBuilder::commit_txn() {
}
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
- if (tablet()->tablet_schema()->num_variant_columns() > 0) {
+
+ const RowsetWriterContext& rw_ctx = _rowset_writer->context();
+ if (rw_ctx.tablet_schema->num_variant_columns() > 0) {
+ // Need to merge schema with `rw_ctx.merged_tablet_schema` in prior,
+ // merged schema keeps the newest merged schema for the rowset, which
is updated and merged
+ // during flushing segments.
+ if (rw_ctx.merged_tablet_schema != nullptr) {
+
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema));
+ }
+ // We should merge rowset schema further, in case that the
merged_tablet_schema maybe null
+ // when enable_memtable_on_sink_node is true, the merged_tablet_schema
will not be passed to
+ // the destination backend.
// update tablet schema when meet variant columns, before commit_txn
// Eg. rowset schema: A(int), B(float), C(int), D(int)
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
- const RowsetWriterContext& rw_ctx = _rowset_writer->context();
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
}
// Transfer ownership of `PendingRowsetGuard` to `TxnManager`
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index a4ed6a527bf..a0483ad5d8e 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -927,7 +927,8 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
// This is because the schema change for a variant needs to ignore the
extracted columns.
// Otherwise, the schema types in different rowsets might be
inconsistent. When performing a schema change,
// the complete variant is constructed by reading all the sub-columns
of the variant.
- sc_params.new_tablet_schema =
new_tablet->tablet_schema()->copy_without_extracted_columns();
+ sc_params.new_tablet_schema =
+
new_tablet->tablet_schema()->copy_without_variant_extracted_columns();
sc_params.ref_rowset_readers.reserve(rs_splits.size());
for (RowSetSplits& split : rs_splits) {
sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b48262250ed..0a791614b87 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2120,7 +2120,11 @@ Status Tablet::create_transient_rowset_writer(
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
context.tablet_schema = std::make_shared<TabletSchema>();
- context.tablet_schema->copy_from(*(rowset_ptr->tablet_schema()));
+ // During a partial update, the extracted columns of a variant should not
be included in the tablet schema.
+ // This is because the partial update for a variant needs to ignore the
extracted columns.
+ // Otherwise, the schema types in different rowsets might be inconsistent.
When performing a partial update,
+ // the complete variant is constructed by reading all the sub-columns of
the variant.
+ context.tablet_schema =
rowset_ptr->tablet_schema()->copy_without_variant_extracted_columns();
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
@@ -2945,6 +2949,13 @@ Status
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
(std::find(including_cids.cbegin(), including_cids.cend(),
rowset_schema->sequence_col_idx()) !=
including_cids.cend());
}
+ if (rowset_schema->num_variant_columns() > 0) {
+ // During partial updates, the extracted columns of a variant should
not be included in the rowset schema.
+ // This is because the partial update for a variant needs to ignore
the extracted columns.
+ // Otherwise, the schema types in different rowsets might be
inconsistent. When performing a partial update,
+ // the complete variant is constructed by reading all the sub-columns
of the variant.
+ rowset_schema =
rowset_schema->copy_without_variant_extracted_columns();
+ }
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 0900c6d8d40..26d9d913f2f 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -563,6 +563,10 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
_column_path->from_protobuf(column.column_path_info());
_parent_col_unique_id =
column.column_path_info().parrent_column_unique_id();
}
+ if (is_variant_type() && !column.has_column_path_info()) {
+ // set path info for variant root column, to prevent from missing
+ _column_path =
std::make_shared<vectorized::PathInData>(_col_name_lower_case);
+ }
for (auto& column_pb : column.sparse_columns()) {
TabletColumn column;
column.init_from_pb(column_pb);
@@ -854,7 +858,8 @@ void TabletSchema::append_column(TabletColumn column,
ColumnType col_type) {
_cols.push_back(std::make_shared<TabletColumn>(std::move(column)));
// The dropped column may have same name with exsiting column, so that
// not add to name to index map, only for uid to index map
- if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type()) {
+ if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type() ||
+ _cols.back()->is_extracted_column()) {
_field_name_to_index.emplace(StringRef(_cols.back()->name()),
_num_columns);
_field_path_to_index[_cols.back()->path_info_ptr().get()] =
_num_columns;
} else if (col_type == ColumnType::NORMAL) {
@@ -1112,7 +1117,7 @@ void TabletSchema::merge_dropped_columns(const
TabletSchema& src_schema) {
}
}
-TabletSchemaSPtr TabletSchema::copy_without_extracted_columns() {
+TabletSchemaSPtr TabletSchema::copy_without_variant_extracted_columns() {
TabletSchemaSPtr copy = std::make_shared<TabletSchema>();
TabletSchemaPB tablet_schema_pb;
this->to_schema_pb(&tablet_schema_pb);
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index b8f26a1f601..bd3b1f6ca4e 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -454,7 +454,7 @@ public:
vectorized::Block create_block_by_cids(const std::vector<uint32_t>& cids);
- std::shared_ptr<TabletSchema> copy_without_extracted_columns();
+ std::shared_ptr<TabletSchema> copy_without_variant_extracted_columns();
InvertedIndexStorageFormatPB get_inverted_index_storage_format() const {
return _inverted_index_storage_format;
}
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 3bae978f4d3..1f59e000d32 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -749,8 +749,15 @@ void ColumnObject::insert_from(const IColumn& src, size_t
n) {
void ColumnObject::try_insert(const Field& field) {
if (field.get_type() != Field::Types::VariantMap) {
auto* root = get_subcolumn({});
- if (!root) {
- doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, "Failed to
find root column_path");
+ // Insert to an emtpy ColumnObject may result root null,
+ // so create a root column of Variant is expected.
+ if (root == nullptr) {
+ bool succ = add_sub_column({}, num_rows);
+ if (!succ) {
+ throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
+ "Failed to add root sub column {}");
+ }
+ root = get_subcolumn({});
}
root->insert(field);
++num_rows;
@@ -1290,9 +1297,11 @@ Status ColumnObject::merge_sparse_to_root_column() {
parser.getWriter().getOutput()->getSize());
result_column_nullable->get_null_map_data().push_back(0);
}
-
- // assign merged column
- subcolumns.get_mutable_root()->data.get_finalized_column_ptr() =
mresult->get_ptr();
+ subcolumns.get_mutable_root()->data.get_finalized_column().clear();
+ // assign merged column, do insert_range_from to make a copy, instead of
replace the ptr itselft
+ // to make sure the root column ptr is not changed
+
subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from(
+ *mresult->get_ptr(), 0, num_rows);
return Status::OK();
}
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 55abd534dd1..53516877b6d 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -230,10 +230,6 @@ private:
// this structure and fill with Subcolumns sub items
mutable std::shared_ptr<rapidjson::Document> doc_structure;
- // column with raw json strings
- // used for quickly row store encoding
- ColumnPtr rowstore_column;
-
using SubColumnWithName = std::pair<PathInData, const Subcolumn*>;
// Cached search results for previous row (keyed as index in JSON object)
- used as a hint.
mutable std::vector<SubColumnWithName> _prev_positions;
@@ -259,10 +255,6 @@ public:
return
subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable();
}
- void set_rowstore_column(ColumnPtr col) { rowstore_column = col; }
-
- ColumnPtr get_rowstore_column() const { return rowstore_column; }
-
Status serialize_one_row_to_string(int row, std::string* output) const;
Status serialize_one_row_to_string(int row, BufferWritable& output) const;
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index e8fd23f7569..016336d4098 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -374,45 +374,44 @@ void update_least_sparse_column(const
std::vector<TabletSchemaSPtr>& schemas,
update_least_schema_internal(subcolumns_types, common_schema, true,
variant_col_unique_id);
}
-void inherit_root_attributes(TabletSchemaSPtr& schema) {
- std::unordered_map<int32_t, TabletIndex> variants_index_meta;
- // Get all variants tablet index metas if exist
- for (const auto& col : schema->columns()) {
- auto index_meta = schema->get_inverted_index(col->unique_id(), "");
- if (col->is_variant_type() && index_meta != nullptr) {
- variants_index_meta.emplace(col->unique_id(), *index_meta);
+void inherit_column_attributes(const TabletColumn& source, TabletColumn&
target,
+ TabletSchemaSPtr& target_schema) {
+ if (target.type() != FieldType::OLAP_FIELD_TYPE_TINYINT &&
+ target.type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
+ target.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE &&
+ target.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) {
+ // above types are not supported in bf
+ target.set_is_bf_column(source.is_bf_column());
+ }
+ target.set_aggregation_method(source.aggregation());
+ const auto* source_index_meta =
target_schema->get_inverted_index(source.unique_id(), "");
+ if (source_index_meta != nullptr) {
+ // add index meta
+ TabletIndex index_info = *source_index_meta;
+
index_info.set_escaped_escaped_index_suffix_path(target.path_info_ptr()->get_path());
+ // get_inverted_index: No need to check, just inherit directly
+ const auto* target_index_meta =
target_schema->get_inverted_index(target, false);
+ if (target_index_meta != nullptr) {
+ // already exist
+ target_schema->update_index(target, index_info);
+ } else {
+ target_schema->append_index(index_info);
}
}
+}
+void inherit_column_attributes(TabletSchemaSPtr& schema) {
// Add index meta if extracted column is missing index meta
for (size_t i = 0; i < schema->num_columns(); ++i) {
TabletColumn& col = schema->mutable_column(i);
if (!col.is_extracted_column()) {
continue;
}
- if (col.type() != FieldType::OLAP_FIELD_TYPE_TINYINT &&
- col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
- col.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE &&
- col.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) {
- // above types are not supported in bf
-
col.set_is_bf_column(schema->column_by_uid(col.parent_unique_id()).is_bf_column());
- }
-
col.set_aggregation_method(schema->column_by_uid(col.parent_unique_id()).aggregation());
- auto it = variants_index_meta.find(col.parent_unique_id());
- // variant has no index meta, ignore
- if (it == variants_index_meta.end()) {
+ if (schema->field_index(col.parent_unique_id()) == -1) {
+ // parent column is missing, maybe dropped
continue;
}
- auto index_meta = schema->get_inverted_index(col, false);
- // add index meta
- TabletIndex index_info = it->second;
-
index_info.set_escaped_escaped_index_suffix_path(col.path_info_ptr()->get_path());
- if (index_meta != nullptr) {
- // already exist
- schema->update_index(col, index_info);
- } else {
- schema->append_index(index_info);
- }
+
inherit_column_attributes(schema->column_by_uid(col.parent_unique_id()), col,
schema);
}
}
@@ -473,7 +472,7 @@ Status get_least_common_schema(const
std::vector<TabletSchemaSPtr>& schemas,
update_least_sparse_column(schemas, output_schema, unique_id,
path_set);
}
- inherit_root_attributes(output_schema);
+ inherit_column_attributes(output_schema);
if (check_schema_size &&
output_schema->columns().size() >
config::variant_max_merged_tablet_schema_size) {
return Status::DataQualityError("Reached max column size limit {}",
@@ -483,25 +482,8 @@ Status get_least_common_schema(const
std::vector<TabletSchemaSPtr>& schemas,
return Status::OK();
}
-Status parse_and_encode_variant_columns(Block& block, const std::vector<int>&
variant_pos,
- const ParseContext& ctx) {
- try {
- // Parse each variant column from raw string column
- RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block,
variant_pos, ctx));
- vectorized::schema_util::finalize_variant_columns(block, variant_pos,
- false /*not ingore
sparse*/);
- RETURN_IF_ERROR(
-
vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos));
- } catch (const doris::Exception& e) {
- // TODO more graceful, max_filter_ratio
- LOG(WARNING) << "encounter execption " << e.to_string();
- return Status::InternalError(e.to_string());
- }
- return Status::OK();
-}
-
-Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
- const ParseContext& ctx) {
+Status _parse_variant_columns(Block& block, const std::vector<int>&
variant_pos,
+ const ParseContext& ctx) {
for (int i = 0; i < variant_pos.size(); ++i) {
auto column_ref = block.get_by_position(variant_pos[i]).column;
bool is_nullable = column_ref->is_nullable();
@@ -510,36 +492,8 @@ Status parse_variant_columns(Block& block, const
std::vector<int>& variant_pos,
var.assume_mutable_ref().finalize();
MutableColumnPtr variant_column;
- bool record_raw_string_with_serialization = false;
- // set
- auto encode_rowstore = [&]() {
- if (!ctx.record_raw_json_column) {
- return Status::OK();
- }
- auto* var =
static_cast<vectorized::ColumnObject*>(variant_column.get());
- if (record_raw_string_with_serialization) {
- // encode to raw json column
- auto raw_column = vectorized::ColumnString::create();
- for (size_t i = 0; i < var->rows(); ++i) {
- std::string raw_str;
- RETURN_IF_ERROR(var->serialize_one_row_to_string(i,
&raw_str));
- raw_column->insert_data(raw_str.c_str(), raw_str.size());
- }
- var->set_rowstore_column(raw_column->get_ptr());
- } else {
- // use original input json column
- auto original_var_root =
vectorized::check_and_get_column<vectorized::ColumnObject>(
-
remove_nullable(column_ref).get())
- ->get_root();
- var->set_rowstore_column(original_var_root);
- }
- return Status::OK();
- };
-
if (!var.is_scalar_variant()) {
variant_column = var.assume_mutable();
- record_raw_string_with_serialization = true;
- RETURN_IF_ERROR(encode_rowstore());
// already parsed
continue;
}
@@ -576,8 +530,19 @@ Status parse_variant_columns(Block& block, const
std::vector<int>& variant_pos,
result = ColumnNullable::create(result, null_map);
}
block.get_by_position(variant_pos[i]).column = result;
- RETURN_IF_ERROR(encode_rowstore());
- // block.get_by_position(variant_pos[i]).type =
std::make_shared<DataTypeObject>("json", true);
+ }
+ return Status::OK();
+}
+
+Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
+ const ParseContext& ctx) {
+ try {
+ // Parse each variant column from raw string column
+ RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block,
variant_pos, ctx));
+ } catch (const doris::Exception& e) {
+ // TODO more graceful, max_filter_ratio
+ LOG(WARNING) << "encounter execption " << e.to_string();
+ return Status::InternalError(e.to_string());
}
return Status::OK();
}
@@ -597,53 +562,16 @@ void finalize_variant_columns(Block& block, const
std::vector<int>& variant_pos,
}
}
-Status encode_variant_sparse_subcolumns(Block& block, const std::vector<int>&
variant_pos) {
- for (int i = 0; i < variant_pos.size(); ++i) {
- auto& column_ref =
block.get_by_position(variant_pos[i]).column->assume_mutable_ref();
- auto& column =
- column_ref.is_nullable()
- ? assert_cast<ColumnObject&>(
-
assert_cast<ColumnNullable&>(column_ref).get_nested_column())
- : assert_cast<ColumnObject&>(column_ref);
- // Make sure the root node is jsonb storage type
- auto expected_root_type =
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
- column.ensure_root_node_type(expected_root_type);
- RETURN_IF_ERROR(column.merge_sparse_to_root_column());
- }
+Status encode_variant_sparse_subcolumns(ColumnObject& column) {
+ // Make sure the root node is jsonb storage type
+ auto expected_root_type =
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
+ column.ensure_root_node_type(expected_root_type);
+ RETURN_IF_ERROR(column.merge_sparse_to_root_column());
return Status::OK();
}
-static void _append_column(const TabletColumn& parent_variant,
- const ColumnObject::Subcolumns::NodePtr& subcolumn,
- TabletSchemaSPtr& to_append, bool is_sparse) {
- // If column already exist in original tablet schema, then we pick common
type
- // and cast column to common type, and modify tablet column to common type,
- // otherwise it's a new column
- CHECK(to_append.use_count() == 1);
- const std::string& column_name =
- parent_variant.name_lower_case() + "." +
subcolumn->path.get_path();
- const vectorized::DataTypePtr& final_data_type_from_object =
- subcolumn->data.get_least_common_type();
- vectorized::PathInDataBuilder full_path_builder;
- auto full_path =
full_path_builder.append(parent_variant.name_lower_case(), false)
- .append(subcolumn->path.get_parts(), false)
- .build();
- TabletColumn tablet_column = vectorized::schema_util::get_column_by_type(
- final_data_type_from_object, column_name,
- vectorized::schema_util::ExtraInfo {.unique_id = -1,
- .parent_unique_id =
parent_variant.unique_id(),
- .path_info = full_path});
-
- if (!is_sparse) {
- to_append->append_column(std::move(tablet_column));
- } else {
- to_append->mutable_column_by_uid(parent_variant.unique_id())
- .append_sparse_column(std::move(tablet_column));
- }
-}
-
// sort by paths in lexicographical order
-static vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
+vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
const vectorized::ColumnObject::Subcolumns& subcolumns) {
// sort by paths in lexicographical order
vectorized::ColumnObject::Subcolumns sorted = subcolumns;
@@ -653,70 +581,12 @@ static vectorized::ColumnObject::Subcolumns
get_sorted_subcolumns(
return sorted;
}
-void rebuild_schema_and_block(const TabletSchemaSPtr& original,
- const std::vector<int>& variant_positions,
Block& flush_block,
- TabletSchemaSPtr& flush_schema) {
- // rebuild schema and block with variant extracted columns
-
- // 1. Flatten variant column into flat columns, append flatten columns to
the back of original Block and TabletSchema
- // those columns are extracted columns, leave none extracted columns
remain in original variant column, which is
- // JSONB format at present.
- // 2. Collect columns that need to be added or modified when data type
changes or new columns encountered
- for (size_t variant_pos : variant_positions) {
- auto column_ref = flush_block.get_by_position(variant_pos).column;
- bool is_nullable = column_ref->is_nullable();
- const vectorized::ColumnObject& object_column =
assert_cast<vectorized::ColumnObject&>(
- remove_nullable(column_ref)->assume_mutable_ref());
- const TabletColumn& parent_column = *original->columns()[variant_pos];
- CHECK(object_column.is_finalized());
- std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
- // common extracted columns
- for (const auto& entry :
get_sorted_subcolumns(object_column.get_subcolumns())) {
- if (entry->path.empty()) {
- // root
- root = entry;
- continue;
- }
- _append_column(parent_column, entry, flush_schema, false);
- const std::string& column_name =
- parent_column.name_lower_case() + "." +
entry->path.get_path();
-
flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(),
- entry->data.get_least_common_type(),
column_name});
- }
-
- // add sparse columns to flush_schema
- for (const auto& entry :
get_sorted_subcolumns(object_column.get_sparse_subcolumns())) {
- _append_column(parent_column, entry, flush_schema, true);
- }
-
- // Create new variant column and set root column
- auto obj = vectorized::ColumnObject::create(true, false);
- // '{}' indicates a root path
- static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
- {}, root->data.get_finalized_column_ptr()->assume_mutable(),
- root->data.get_least_common_type());
- // // set for rowstore
- if (original->store_row_column()) {
-
static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column(
- object_column.get_rowstore_column());
- }
- vectorized::ColumnPtr result = obj->get_ptr();
- if (is_nullable) {
- const auto& null_map = assert_cast<const
vectorized::ColumnNullable&>(*column_ref)
- .get_null_map_column_ptr();
- result = vectorized::ColumnNullable::create(result, null_map);
- }
- flush_block.get_by_position(variant_pos).column = result;
- vectorized::PathInDataBuilder full_root_path_builder;
- auto full_root_path =
- full_root_path_builder.append(parent_column.name_lower_case(),
false).build();
- TabletColumn new_col = flush_schema->column(variant_pos);
- new_col.set_path_info(full_root_path);
- flush_schema->replace_column(variant_pos, new_col);
- VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
- }
+// ---------------------------
- vectorized::schema_util::inherit_root_attributes(flush_schema);
+std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
+ Block tmp;
+ tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
+ return tmp.dump_data(0, tmp.rows());
}
// ---------------------------
@@ -747,13 +617,5 @@ Status extract(ColumnPtr source, const PathInData& path,
MutableColumnPtr& dst)
->assume_mutable();
return Status::OK();
}
-// ---------------------------
-
-std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
- Block tmp;
- tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
- return tmp.dump_data(0, tmp.rows());
-}
-// ---------------------------
} // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 078081593c5..16288541415 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -90,13 +90,11 @@ struct ParseContext {
// 1. parse variant from raw json string
// 2. finalize variant column to each subcolumn least commn types, default
ignore sparse sub columns
// 3. encode sparse sub columns
-Status parse_and_encode_variant_columns(Block& block, const std::vector<int>&
variant_pos,
- const ParseContext& ctx);
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseContext& ctx);
void finalize_variant_columns(Block& block, const std::vector<int>&
variant_pos,
bool ignore_sparse = true);
-Status encode_variant_sparse_subcolumns(Block& block, const std::vector<int>&
variant_pos);
+Status encode_variant_sparse_subcolumns(ColumnObject& column);
// Pick the tablet schema with the highest schema version as the reference.
// Then update all variant columns to there least common types.
@@ -117,15 +115,14 @@ void update_least_sparse_column(const
std::vector<TabletSchemaSPtr>& schemas,
const std::unordered_set<PathInData,
PathInData::Hash>& path_set);
// inherit attributes like index/agg info from it's parent column
-void inherit_root_attributes(TabletSchemaSPtr& schema);
-
-// Rebuild schema from original schema by extend dynamic columns generated
from ColumnObject.
-// Block consists of two parts, dynamic part of columns and static part of
columns.
-// static extracted
-// | --------- | ----------- |
-// The static ones are original tablet_schame columns
-void rebuild_schema_and_block(const TabletSchemaSPtr& original, const
std::vector<int>& variant_pos,
- Block& flush_block, TabletSchemaSPtr&
flush_schema);
+void inherit_column_attributes(TabletSchemaSPtr& schema);
+
+void inherit_column_attributes(const TabletColumn& source, TabletColumn&
target,
+ TabletSchemaSPtr& target_schema);
+
+// get sorted subcolumns of variant
+vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
+ const vectorized::ColumnObject::Subcolumns& subcolumns);
// Extract json data from source with path
Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr&
dst);
diff --git a/be/src/vec/data_types/data_type.h
b/be/src/vec/data_types/data_type.h
index e708cda164e..986f957d72b 100644
--- a/be/src/vec/data_types/data_type.h
+++ b/be/src/vec/data_types/data_type.h
@@ -412,5 +412,9 @@ inline bool is_complex_type(const DataTypePtr& data_type) {
return which.is_array() || which.is_map() || which.is_struct();
}
+inline bool is_variant_type(const DataTypePtr& data_type) {
+ return WhichDataType(data_type).is_variant_type();
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index 4d8a3020375..e9015db653a 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -37,10 +37,11 @@ namespace doris {
namespace vectorized {
-Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column,
- MysqlRowBuffer<false>&
row_buffer, int row_idx,
- bool col_const,
- const FormatOptions&
options) const {
+template <bool is_binary_format>
+Status DataTypeObjectSerDe::_write_column_to_mysql(const IColumn& column,
+
MysqlRowBuffer<is_binary_format>& row_buffer,
+ int row_idx, bool col_const,
+ const FormatOptions&
options) const {
const auto& variant = assert_cast<const ColumnObject&>(column);
if (!variant.is_finalized()) {
const_cast<ColumnObject&>(variant).finalize();
@@ -67,6 +68,20 @@ Status DataTypeObjectSerDe::write_column_to_mysql(const
IColumn& column,
return Status::OK();
}
+Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column,
+ MysqlRowBuffer<true>&
row_buffer, int row_idx,
+ bool col_const,
+ const FormatOptions&
options) const {
+ return _write_column_to_mysql(column, row_buffer, row_idx, col_const,
options);
+}
+
+Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column,
+ MysqlRowBuffer<false>&
row_buffer, int row_idx,
+ bool col_const,
+ const FormatOptions&
options) const {
+ return _write_column_to_mysql(column, row_buffer, row_idx, col_const,
options);
+}
+
void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column,
JsonbWriter& result,
Arena* mem_pool, int32_t
col_id,
int row_num) const {
@@ -75,12 +90,14 @@ void DataTypeObjectSerDe::write_one_cell_to_jsonb(const
IColumn& column, JsonbWr
const_cast<ColumnObject&>(variant).finalize();
}
result.writeKey(col_id);
+ std::string value_str;
+ if (!variant.serialize_one_row_to_string(row_num, &value_str)) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize
variant {}",
+ variant.dump_structure());
+ }
JsonbParser json_parser;
- CHECK(variant.get_rowstore_column() != nullptr);
- // use original document
- const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num);
// encode as jsonb
- bool succ = json_parser.parse(data_ref.data, data_ref.size);
+ bool succ = json_parser.parse(value_str.data(), value_str.size());
// maybe more graceful, it is ok to check here since data could be parsed
CHECK(succ);
result.writeStartBinary();
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h
b/be/src/vec/data_types/serde/data_type_object_serde.h
index 80554d3dbef..66178f0ecb3 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.h
+++ b/be/src/vec/data_types/serde/data_type_object_serde.h
@@ -82,9 +82,7 @@ public:
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>&
row_buffer,
int row_idx, bool col_const,
- const FormatOptions& options) const override {
- return Status::NotSupported("write_column_to_mysql with type " +
column.get_name());
- }
+ const FormatOptions& options) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>&
row_buffer,
int row_idx, bool col_const,
@@ -96,6 +94,11 @@ public:
std::vector<StringRef>& buffer_list) const
override {
return Status::NotSupported("write_column_to_orc with type " +
column.get_name());
}
+
+private:
+ template <bool is_binary_format>
+ Status _write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
+ int row_idx, bool col_const, const
FormatOptions& options) const;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 8f3163c36c4..9a10ba8cf35 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -445,7 +445,7 @@ Status NewOlapScanner::_init_variant_columns() {
}
}
}
- schema_util::inherit_root_attributes(tablet_schema);
+ schema_util::inherit_column_attributes(tablet_schema);
return Status::OK();
}
diff --git a/be/src/vec/functions/function_variant_element.cpp
b/be/src/vec/functions/function_variant_element.cpp
index 89256635279..84ddc3b8046 100644
--- a/be/src/vec/functions/function_variant_element.cpp
+++ b/be/src/vec/functions/function_variant_element.cpp
@@ -32,6 +32,7 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/columns/column_string.h"
+#include "vec/columns/subcolumn_tree.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
@@ -43,6 +44,7 @@
#include "vec/functions/function.h"
#include "vec/functions/function_helpers.h"
#include "vec/functions/simple_function_factory.h"
+#include "vec/json/path_in_data.h"
namespace doris::vectorized {
@@ -128,8 +130,45 @@ private:
*result = ColumnObject::create(true, type,
std::move(result_column));
return Status::OK();
} else {
- return Status::NotSupported("Not support element_at with none
scalar variant {}",
- src.debug_string());
+ auto mutable_src = src.clone_finalized();
+ auto* mutable_ptr = assert_cast<ColumnObject*>(mutable_src.get());
+ PathInData path(field_name);
+ ColumnObject::Subcolumns subcolumns =
mutable_ptr->get_subcolumns();
+ const auto* node = subcolumns.find_exact(path);
+ auto result_col = ColumnObject::create(true, false /*should not
create root*/);
+ if (node != nullptr) {
+ std::vector<decltype(node)> nodes;
+ PathsInData paths;
+ ColumnObject::Subcolumns::get_leaves_of_node(node, nodes,
paths);
+ ColumnObject::Subcolumns new_subcolumns;
+ for (const auto* n : nodes) {
+ PathInData new_path = n->path.copy_pop_front();
+ VLOG_DEBUG << "add node " << new_path.get_path()
+ << ", data size: " << n->data.size()
+ << ", finalized size: " <<
n->data.get_finalized_column().size()
+ << ", common type: " <<
n->data.get_least_common_type()->get_name();
+ // if new_path is empty, indicate it's the root column,
but adding a root will return false when calling add
+ if (!new_subcolumns.add(new_path, n->data)) {
+ VLOG_DEBUG << "failed to add node " <<
new_path.get_path();
+ }
+ }
+ // handle the root node
+ if (new_subcolumns.empty() && !nodes.empty()) {
+ CHECK_EQ(nodes.size(), 1);
+ new_subcolumns.create_root(ColumnObject::Subcolumn {
+
nodes[0]->data.get_finalized_column_ptr()->assume_mutable(),
+ nodes[0]->data.get_least_common_type(), true,
true});
+ }
+ auto container =
ColumnObject::create(std::move(new_subcolumns), true);
+ result_col->insert_range_from(*container, 0,
container->size());
+ } else {
+ result_col->insert_many_defaults(src.size());
+ }
+ *result = result_col->get_ptr();
+ VLOG_DEBUG << "dump new object "
+ << static_cast<const
ColumnObject*>(result_col.get())->debug_string()
+ << ", path " << path.get_path();
+ return Status::OK();
}
}
diff --git a/be/src/vec/olap/olap_data_convertor.cpp
b/be/src/vec/olap/olap_data_convertor.cpp
index 86c1d2d6669..e5f945953f6 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -17,6 +17,7 @@
#include "vec/olap/olap_data_convertor.h"
+#include <memory>
#include <new>
#include "common/compiler_util.h" // IWYU pragma: keep
@@ -42,6 +43,7 @@
#include "vec/columns/column_struct.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
+#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_agg_state.h"
#include "vec/data_types/data_type_array.h"
@@ -214,6 +216,16 @@ void OlapBlockDataConvertor::set_source_content(const
vectorized::Block* block,
}
}
+Status OlapBlockDataConvertor::set_source_content_with_specifid_column(
+ const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t
num_rows, uint32_t cid) {
+ DCHECK(num_rows > 0);
+ DCHECK(row_pos + num_rows <= typed_column.column->size());
+ DCHECK(cid < _convertors.size());
+ RETURN_IF_CATCH_EXCEPTION(
+ { _convertors[cid]->set_source_column(typed_column, row_pos,
num_rows); });
+ return Status::OK();
+}
+
Status OlapBlockDataConvertor::set_source_content_with_specifid_columns(
const vectorized::Block* block, size_t row_pos, size_t num_rows,
std::vector<uint32_t> cids) {
@@ -1078,8 +1090,6 @@ void
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column(
? assert_cast<const
vectorized::ColumnObject&>(*typed_column.column)
: assert_cast<const vectorized::ColumnObject&>(
nullable_column->get_nested_column());
-
- const_cast<ColumnObject&>(variant).finalize_if_not();
if (variant.is_null_root()) {
auto root_type =
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
auto root_col = root_type->create_column();
@@ -1087,19 +1097,25 @@ void
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column(
const_cast<ColumnObject&>(variant).create_root(root_type,
std::move(root_col));
variant.check_consistency();
}
- auto root_of_variant = variant.get_root();
- auto nullable = assert_cast<const ColumnNullable*>(root_of_variant.get());
- CHECK(nullable);
- _root_data_column = assert_cast<const
ColumnString*>(&nullable->get_nested_column());
- _root_data_convertor->set_source_column({root_of_variant->get_ptr(),
nullptr, ""}, row_pos,
- num_rows);
+ // ensure data finalized
+ _source_column_ptr = &const_cast<ColumnObject&>(variant);
+ _source_column_ptr->finalize(false);
+ _root_data_convertor =
std::make_unique<OlapColumnDataConvertorVarChar>(true);
+ _root_data_convertor->set_source_column(
+ {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, row_pos,
num_rows);
OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column,
row_pos,
num_rows);
}
// convert root data
Status
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap() {
- RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap,
_root_data_column));
+
RETURN_IF_ERROR(vectorized::schema_util::encode_variant_sparse_subcolumns(*_source_column_ptr));
+#ifndef NDEBUG
+ _source_column_ptr->check_consistency();
+#endif
+ const auto* nullable = assert_cast<const
ColumnNullable*>(_source_column_ptr->get_root().get());
+ const auto* root_column = assert_cast<const
ColumnString*>(&nullable->get_nested_column());
+ RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap,
root_column));
return Status::OK();
}
diff --git a/be/src/vec/olap/olap_data_convertor.h
b/be/src/vec/olap/olap_data_convertor.h
index 0ec720fcdc1..764a7a4a7c3 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -34,6 +34,7 @@
#include "olap/uint24.h"
#include "runtime/collection_value.h"
#include "util/slice.h"
+#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/columns/column_string.h"
@@ -77,6 +78,9 @@ public:
void set_source_content(const vectorized::Block* block, size_t row_pos,
size_t num_rows);
Status set_source_content_with_specifid_columns(const vectorized::Block*
block, size_t row_pos,
size_t num_rows,
std::vector<uint32_t> cids);
+ Status set_source_content_with_specifid_column(const
ColumnWithTypeAndName& typed_column,
+ size_t row_pos, size_t
num_rows, uint32_t cid);
+
void clear_source_content();
std::pair<Status, IOlapColumnDataAccessor*> convert_column_data(size_t
cid);
void add_column_data_convertor(const TabletColumn& column);
@@ -487,8 +491,8 @@ private:
class OlapColumnDataConvertorVariant : public OlapColumnDataConvertorBase {
public:
- OlapColumnDataConvertorVariant()
- :
_root_data_convertor(std::make_unique<OlapColumnDataConvertorVarChar>(true)) {}
+ OlapColumnDataConvertorVariant() = default;
+
void set_source_column(const ColumnWithTypeAndName& typed_column,
size_t row_pos,
size_t num_rows) override;
Status convert_to_olap() override;
@@ -497,10 +501,11 @@ private:
const void* get_data_at(size_t offset) const override;
private:
- // encodes sparsed columns
- const ColumnString* _root_data_column;
- // _nullmap contains null info for this variant
+ // // encodes sparsed columns
+ // const ColumnString* _root_data_column;
+ // // _nullmap contains null info for this variant
std::unique_ptr<OlapColumnDataConvertorVarChar> _root_data_convertor;
+ ColumnObject* _source_column_ptr;
};
private:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
index 0a8dbf5bad9..fda46d13fff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
@@ -191,12 +191,8 @@ public class UpdateStmt extends DdlStmt {
// step3: generate select list and insert column name list in insert
stmt
boolean isMow = ((OlapTable)
targetTable).getEnableUniqueKeyMergeOnWrite();
- boolean hasVariant = false;
int setExprCnt = 0;
for (Column column : targetTable.getColumns()) {
- if (column.getType().isVariantType()) {
- hasVariant = true;
- }
for (BinaryPredicate setExpr : setExprs) {
Expr lhs = setExpr.getChild(0);
if (((SlotRef) lhs).getColumn().equals(column)) {
@@ -204,13 +200,10 @@ public class UpdateStmt extends DdlStmt {
}
}
}
- // 1.table with sequence col cannot use partial update cause in MOW,
we encode pk
+ // table with sequence col cannot use partial update cause in MOW, we
encode pk
// with seq column but we don't know which column is sequence in update
- // 2. variant column update schema during load, so implement partial
update is complicated,
- // just ignore it at present
if (isMow && ((OlapTable) targetTable).getSequenceCol() == null
- && setExprCnt <= targetTable.getColumns().size() * 3 / 10
- && !hasVariant) {
+ && setExprCnt <= targetTable.getColumns().size() * 3 / 10) {
isPartialUpdate = true;
}
Optional<Column> sequenceMapCol = Optional.empty();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
index 48766caa5ce..542dab31a01 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
@@ -159,7 +159,7 @@ public class UpdateCommand extends Command implements
ForwardWithSync, Explainab
boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite()
&& selectItems.size() < targetTable.getColumns().size()
- && !targetTable.hasVariantColumns() &&
targetTable.getSequenceCol() == null
+ && targetTable.getSequenceCol() == null
&& partialUpdateColNameToExpression.size() <=
targetTable.getFullSchema().size() * 3 / 10;
List<String> partialUpdateColNames = new ArrayList<>();
diff --git a/regression-test/data/variant_p0/delete_update.out
b/regression-test/data/variant_p0/delete_update.out
index 0e478bdeb0d..4390610c21d 100644
--- a/regression-test/data/variant_p0/delete_update.out
+++ b/regression-test/data/variant_p0/delete_update.out
@@ -7,10 +7,17 @@
-- !sql --
2 {"updated_value":123} {"updated_value":123}
-6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
+6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}
7 {"updated_value":1111} yyy
-- !sql --
2 {"updated_value":123} {"updated_value":123}
-6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
+6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}
+
+-- !sql --
+1 "ddddddddddd" 1111 199 10 {"new_data1":1}
+2 "eeeeee" 2222 299 20 {"new_data2":2}
+3 "aaaaa" 3333 399 30 {"new_data3":3}
+4 "bbbbbbbb" 4444 499 40 {"new_data4":4}
+5 "cccccccccccc" 5555 599 50 {"new_data5":5}
diff --git a/regression-test/data/variant_p0/partial_update_parallel1.csv
b/regression-test/data/variant_p0/partial_update_parallel1.csv
new file mode 100644
index 00000000000..4ba84bb7785
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel1.csv
@@ -0,0 +1,5 @@
+1,"ddddddddddd"
+2,"eeeeee"
+3,"aaaaa"
+4,"bbbbbbbb"
+5,"cccccccccccc"
diff --git a/regression-test/data/variant_p0/partial_update_parallel2.csv
b/regression-test/data/variant_p0/partial_update_parallel2.csv
new file mode 100644
index 00000000000..1560d6d3261
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel2.csv
@@ -0,0 +1,5 @@
+1,1111,199
+2,2222,299
+3,3333,399
+4,4444,499
+5,5555,599
diff --git a/regression-test/data/variant_p0/partial_update_parallel3.csv
b/regression-test/data/variant_p0/partial_update_parallel3.csv
new file mode 100644
index 00000000000..17abeef1a9c
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel3.csv
@@ -0,0 +1,5 @@
+1,10,{"new_data1" : 1}
+2,20,{"new_data2" : 2}
+3,30,{"new_data3" : 3}
+4,40,{"new_data4" : 4}
+5,50,{"new_data5" : 5}
diff --git a/regression-test/data/variant_p0/partial_update_parallel4.csv
b/regression-test/data/variant_p0/partial_update_parallel4.csv
new file mode 100644
index 00000000000..0a7cbd412fa
--- /dev/null
+++ b/regression-test/data/variant_p0/partial_update_parallel4.csv
@@ -0,0 +1,3 @@
+1,1
+3,1
+5,1
diff --git a/regression-test/data/variant_p0/variant_with_rowstore.out
b/regression-test/data/variant_p0/variant_with_rowstore.out
index d7d759baad3..6c34622bec8 100644
--- a/regression-test/data/variant_p0/variant_with_rowstore.out
+++ b/regression-test/data/variant_p0/variant_with_rowstore.out
@@ -23,3 +23,12 @@
5 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"}
6 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"}
+-- !point_select --
+-3 {"a":1,"b":1.5,"c":[1,2,3]} {"a":1,"b":1.5,"c":[1,2,3]}
+
+-- !point_select --
+-2 {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}}
{"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}}
+
+-- !point_select --
+-1 {"a":1123} {"a":1123}
+
diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy
b/regression-test/suites/variant_github_events_p0_new/load.groovy
index 0be0f205b69..c063ebecf26 100644
--- a/regression-test/suites/variant_github_events_p0_new/load.groovy
+++ b/regression-test/suites/variant_github_events_p0_new/load.groovy
@@ -95,6 +95,36 @@ suite("regression_test_variant_github_events_p0",
"nonConcurrent"){
sql """
insert into github_events_2 select 1, cast(v["repo"]["name"] as
string) FROM github_events;
"""
+ // insert batches of nulls
+ for(int t = 0; t <= 10; t += 1){
+ long k = 9223372036854775107 + t
+ sql """INSERT INTO github_events VALUES (${k}, NULL)"""
+ }
+ sql """ALTER TABLE github_events SET("bloom_filter_columns" = "v")"""
+ // wait for add bloom filter finished
+ def getJobState = { tableName ->
+ def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='github_events' ORDER BY createtime DESC LIMIT 1 """
+ return jobStateResult[0][9]
+ }
+ int max_try_time = 200
+ while (max_try_time--){
+ String result = getJobState("github_events")
+ if (result == "FINISHED") {
+ break
+ } else {
+ sleep(2000)
+ if (max_try_time < 1){
+ assertEquals(1,2)
+ }
+ }
+ }
+ sql """ALTER TABLE github_events ADD COLUMN v2 variant DEFAULT NULL"""
+ for(int t = 0; t <= 10; t += 1){
+ long k = 9223372036854775107 + t
+ sql """INSERT INTO github_events VALUES (${k}, '{"aaaa" : 1234, "bbbb"
: "11ssss"}', '{"xxxx" : 1234, "yyyy" : [1.111]}')"""
+ }
+ sql """ALTER TABLE github_events DROP COLUMN v2"""
+ sql """DELETE FROM github_events where k >= 9223372036854775107"""
qt_sql_select_count """ select count(*) from github_events_2; """
}
diff --git a/regression-test/suites/variant_p0/delete_update.groovy
b/regression-test/suites/variant_p0/delete_update.groovy
index bbd999559b4..2b126b4c3a6 100644
--- a/regression-test/suites/variant_p0/delete_update.groovy
+++ b/regression-test/suites/variant_p0/delete_update.groovy
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
suite("regression_test_variant_delete_and_update", "variant_type"){
// MOR
def table_name = "var_delete_update"
@@ -30,14 +32,14 @@ suite("regression_test_variant_delete_and_update",
"variant_type"){
"""
// test mor table
- sql """insert into ${table_name} values (1, '{"a" : 1, "b" : [1], "c":
1.0}')"""
- sql """insert into ${table_name} values (2, '{"a" : 2, "b" : [1], "c":
2.0}')"""
- sql """insert into ${table_name} values (3, '{"a" : 3, "b" : [3], "c":
3.0}')"""
- sql """insert into ${table_name} values (4, '{"a" : 4, "b" : [4], "c":
4.0}')"""
- sql """insert into ${table_name} values (5, '{"a" : 5, "b" : [5], "c":
5.0}')"""
+ sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.0}')"""
+ sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.0}')"""
+ sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.0}')"""
+ sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.0}')"""
+ sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.0}')"""
sql "delete from ${table_name} where k = 1"
- sql """update ${table_name} set v = '{"updated_value" : 123}' where k =
2"""
+ sql """update ${table_name} set v = '{"updated_value":123}' where k = 2"""
qt_sql "select * from ${table_name} order by k"
// MOW
@@ -46,41 +48,125 @@ suite("regression_test_variant_delete_and_update",
"variant_type"){
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
- v variant,
+ v variant,
vs string
)
UNIQUE KEY(`k`)
- DISTRIBUTED BY HASH(k) BUCKETS 3
- properties("replication_num" = "1", "enable_unique_key_merge_on_write"
= "true");
+ DISTRIBUTED BY HASH(k) BUCKETS 4
+ properties("replication_num" = "1", "enable_unique_key_merge_on_write"
= "true", "store_row_column" = "true");
"""
sql "insert into var_delete_update_mow select k, cast(v as string), cast(v
as string) from var_delete_update"
sql "delete from ${table_name} where k = 1"
sql "delete from ${table_name} where k in (select k from
var_delete_update_mow where k in (3, 4, 5))"
- sql """insert into ${table_name} values (6, '{"a" : 4, "b" : [4], "c":
4.0}', 'xxx')"""
- sql """insert into ${table_name} values (7, '{"a" : 4, "b" : [4], "c":
4.0}', 'yyy')"""
- sql """update ${table_name} set vs = '{"updated_value" : 123}' where k =
6"""
- sql """update ${table_name} set v = '{"updated_value" : 1111}' where k =
7"""
- qt_sql "select * from ${table_name} order by k"
+ sql """insert into var_delete_update_mow values (6,
'{"a":4,"b":[4],"c":4.1}', 'xxx')"""
+ sql """insert into var_delete_update_mow values (7,
'{"a":4,"b":[4],"c":4.1}', 'yyy')"""
+ sql """update var_delete_update_mow set vs = '{"updated_value" : 123}'
where k = 6"""
+ sql """update var_delete_update_mow set v = '{"updated_value":1111}' where
k = 7"""
+ qt_sql "select * from var_delete_update_mow order by k"
sql """delete from ${table_name} where v = 'xxx' or vs = 'yyy'"""
sql """delete from ${table_name} where vs = 'xxx' or vs = 'yyy'"""
qt_sql "select * from ${table_name} order by k"
// delete & insert concurrently
-
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync"
t1 = Thread.startDaemon {
for (int k = 1; k <= 60; k++) {
- int x = k % 10;
- sql """insert into ${table_name} values(${x}, '${x}', '{"k${x}" :
${x}}')"""
+ int x = new Random().nextInt(61) % 10;
+ sql """insert into ${table_name}(k,vs) values(${x}, '{"k${x}" :
${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" :
${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')"""
}
}
t2 = Thread.startDaemon {
for (int k = 1; k <= 60; k++) {
- int x = k % 10;
- sql """delete from ${table_name} where k = ${x} """
+ int x = new Random().nextInt(61) % 10;
+ sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" :
${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" :
${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')"""
+ }
+ }
+ t3 = Thread.startDaemon {
+ for (int k = 1; k <= 60; k++) {
+ int x = new Random().nextInt(61) % 10;
+ sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" :
${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" :
${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')"""
+ }
+ }
+ t1.join()
+ t2.join()
+ t3.join()
+ sql "sync"
+
+ sql "set enable_unique_key_partial_update=false;"
+ // case 1: concurrent partial update
+ def tableName = "test_primary_key_partial_update_parallel"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `id` int(11) NOT NULL COMMENT "用户 ID",
+ `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+ `score` int(11) NOT NULL COMMENT "用户得分",
+ `test` int(11) NULL COMMENT "null test",
+ `dft` int(11) DEFAULT "4321",
+ `var` variant NULL)
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" =
"true", "store_row_column" = "true")
+ """
+
+ sql """insert into ${tableName} values
+ (2, "doris2", 2000, 223, 2, '{"id":2,
"name":"doris2","score":2000,"test":223,"dft":2}'),
+ (1, "doris", 1000, 123, 1, '{"id":1,
"name":"doris","score":1000,"test":123,"dft":1}'),
+ (5, "doris5", 5000, 523, 5, '{"id":5,
"name":"doris5","score":5000,"test":523,"dft":5}'),
+ (4, "doris4", 4000, 423, 4, '{"id":4,
"name":"doris4","score":4000,"test":423,"dft":4}'),
+ (3, "doris3", 3000, 323, 3, '{"id":3,
"name":"doris3","score":3000,"test":323,"dft":3}');"""
+
+ t1 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,name'
+
+ file 'partial_update_parallel1.csv'
+ time 10000 // limit inflight 10s
}
}
+
+ t2 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,score,test'
+
+ file 'partial_update_parallel2.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t3 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,dft,var'
+
+ file 'partial_update_parallel3.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
t1.join()
t2.join()
+ t3.join()
+
+ sql "sync"
+
+ if (!isCloudMode()) {
+ qt_sql """ select * from ${tableName} order by id;"""
+ }
}
\ No newline at end of file
diff --git
a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy
b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy
index 69c330fa98a..43f1048f151 100644
--- a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy
+++ b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy
@@ -85,8 +85,10 @@ suite("test_compaction_extract_root", "nonConcurrent") {
union all select 5, '{"a": 1123}' as json_str union all select 5,
'{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit
4096 ;"""
// // fix cast to string tobe {}
- qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM
${tableName};"""
- qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};"""
+ qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM test_t"""
+ qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM test_t"""
+ // TODO, sparse columns with v['b'] will not be merged in
hierachical_data_reader with sparse columns
+ // qt_select_b_2 """ select v['b'] from test_t where cast(v['b'] as
string) != '42005' and cast(v['b'] as string) != '42004' and cast(v['b'] as
string) != '42003' order by cast(v['b'] as string); """
qt_select_1_bfcompact """select v['b'] from test_t where k = 0 and
cast(v['a'] as int) = 11245;"""
@@ -140,8 +142,10 @@ suite("test_compaction_extract_root", "nonConcurrent") {
}
assert (rowCount <= 8)
// fix cast to string tobe {}
- qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM
${tableName};"""
- qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};"""
+ qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM test_t"""
+ qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM test_t"""
+ // TODO, sparse columns with v['b'] will not be merged in
hierachical_data_reader with sparse columns
+ // qt_select_b_5 """ select v['b'] from test_t where cast(v['b'] as
string) != '42005' and cast(v['b'] as string) != '42004' and cast(v['b'] as
string) != '42003' order by cast(v['b'] as string); """
qt_select_1 """select v['b'] from test_t where k = 0 and cast(v['a'] as
int) = 11245;"""
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1")
diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy
b/regression-test/suites/variant_p0/variant_with_rowstore.groovy
index 58c245ee831..771f776b3e7 100644
--- a/regression-test/suites/variant_p0/variant_with_rowstore.groovy
+++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy
@@ -29,7 +29,6 @@ suite("regression_test_variant_rowstore", "variant_type"){
def table_name = "var_rowstore"
sql "DROP TABLE IF EXISTS ${table_name}"
- set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
@@ -63,4 +62,50 @@ suite("regression_test_variant_rowstore", "variant_type"){
"""
sql """insert into ${table_name} select k, cast(v as string), cast(v as
string) from var_rowstore"""
qt_sql "select * from ${table_name} order by k limit 10"
+
+ // Parse url
+ def user = context.config.jdbcUser
+ def password = context.config.jdbcPassword
+ def realDb = "regression_test_variant_p0"
+ String jdbcUrl = context.config.jdbcUrl
+ String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
+ def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
+ def sql_port
+ if (urlWithoutSchema.indexOf("/") >= 0) {
+ // e.g: jdbc:mysql://locahost:8080/?a=b
+ sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") +
1, urlWithoutSchema.indexOf("/"))
+ } else {
+ // e.g: jdbc:mysql://locahost:8080
+ sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") +
1)
+ }
+ // set server side prepared statement url
+ def prepare_url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb
+ "?&useServerPrepStmts=true"
+ table_name = "var_rs_pq"
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant,
+ v1 variant
+ )
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false", "store_row_column" = "true", "enable_unique_key_merge_on_write" =
"true");
+ """
+ sql """insert into ${table_name} select k, cast(v as string), cast(v as
string) from var_rowstore"""
+ def result1 = connect(user=user, password=password, url=prepare_url) {
+ def stmt = prepareStatement "select * from var_rs_pq where k = ?"
+ assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement);
+ stmt.setInt(1, -3)
+ qe_point_select stmt
+ stmt.setInt(1, -2)
+ qe_point_select stmt
+ stmt.setInt(1, -1)
+ qe_point_select stmt
+
+ // def stmt1 = prepareStatement "select var['a'] from var_rs_pq where
k = ?"
+ // assertEquals(stmt1.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
+ // stmt.setInt(1, -3)
+ // qe_point_select stmt
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]