This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 5b692a0b fix datatype mismatch between table schema and tablet. (#469)
5b692a0b is described below
commit 5b692a0b1c9b708138df8f774170fa5941403a16
Author: Colin Lee <[email protected]>
AuthorDate: Wed Apr 23 09:49:18 2025 +0800
fix datatype mismatch between table schema and tablet. (#469)
* fix datatype mismatch between table schema and tablet.
* fix memory leak.
---
cpp/src/writer/tsfile_writer.cc | 68 ++++++++++++--------
.../writer/table_view/tsfile_writer_table_test.cc | 72 +++++++++++++++++++---
cpp/test/writer/tsfile_writer_test.cc | 50 ++++++++++++++-
3 files changed, 151 insertions(+), 39 deletions(-)
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index f289f875..a570fc54 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -30,7 +30,6 @@
using namespace common;
namespace storage {
-
int libtsfile_init() {
static bool g_s_is_inited = false;
if (g_s_is_inited) {
@@ -671,8 +670,10 @@ int TsFileWriter::write_tablet_aligned(const Tablet
&tablet) {
if (IS_NULL(value_chunk_writer)) {
continue;
}
- value_write_column(value_chunk_writer, tablet, c, 0,
- tablet.get_cur_row_size());
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0,
+ tablet.get_cur_row_size()))) {
+ return ret;
+ }
}
return ret;
}
@@ -692,8 +693,9 @@ int TsFileWriter::write_tablet(const Tablet &tablet) {
if (IS_NULL(chunk_writer)) {
continue;
}
- // ignore writer failure
- write_column(chunk_writer, tablet, c);
+ if (RET_FAIL(write_column(chunk_writer, tablet, c))) {
+ return ret;
+ }
}
record_count_since_last_flush_ += tablet.max_row_num_;
@@ -739,8 +741,11 @@ int TsFileWriter::write_table(Tablet &tablet) {
if (IS_NULL(value_chunk_writer)) {
continue;
}
- value_write_column(value_chunk_writer, tablet, i,
start_idx,
- end_idx);
+
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
+ i, start_idx, end_idx))) {
+ return ret;
+ }
field_col_count++;
}
}
@@ -758,8 +763,10 @@ int TsFileWriter::write_table(Tablet &tablet) {
if (IS_NULL(chunk_writer)) {
continue;
}
- write_column(chunk_writer, tablet, c, start_idx,
- device_id_end_index_pair.second);
+ if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx,
+ device_id_end_index_pair.second))) {
+ return ret;
+ }
}
start_idx = device_id_end_index_pair.second;
}
@@ -877,32 +884,39 @@ int TsFileWriter::value_write_column(ValueChunkWriter
*value_chunk_writer,
return ret;
}
-#define DO_WRITE_TYPED_COLUMN() \
- do { \
- int ret = E_OK; \
- for (uint32_t r = start_idx; r < end_idx; r++) { \
- if (LIKELY(!col_notnull_bitmap.test(r))) { \
- ret = chunk_writer->write(timestamps[r], col_values[r]); \
- } \
- } \
- return ret; \
- } while (false)
-
-#define DO_VALUE_WRITE_TYPED_COLUMN() \
+#define DO_WRITE_TYPED_COLUMN() \
do { \
int ret = E_OK; \
for (uint32_t r = start_idx; r < end_idx; r++) { \
- if (LIKELY(col_notnull_bitmap.test(r))) { \
- ret = value_chunk_writer->write(timestamps[r], col_values[r], \
- true); \
- } else { \
- ret = value_chunk_writer->write(timestamps[r], col_values[r], \
- false); \
+ if (LIKELY(!col_notnull_bitmap.test(r))) { \
+ if (RET_FAIL( \
+ chunk_writer->write(timestamps[r], col_values[r]))) { \
+ return ret; \
+ } \
} \
} \
return ret; \
} while (false)
+#define DO_VALUE_WRITE_TYPED_COLUMN() \
+ do { \
+ int ret = E_OK; \
+ for (uint32_t r = start_idx; r < end_idx; r++) { \
+ if (LIKELY(col_notnull_bitmap.test(r))) { \
+ if (RET_FAIL(value_chunk_writer->write( \
+ timestamps[r], col_values[r], true))) { \
+ return ret; \
+ } \
+ } else { \
+ if (RET_FAIL(value_chunk_writer->write( \
+ timestamps[r], col_values[r], false))) { \
+ return ret; \
+ } \
+ } \
+ } \
+ return ret; \
+ } while (false)
+
int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer,
int64_t *timestamps, bool *col_values,
BitMap &col_notnull_bitmap,
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 43574dc7..ac2f4040 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -45,9 +45,7 @@ class TsFileWriterTableTest : public ::testing::Test {
mode_t mode = 0666;
write_file_.create(file_name_, flags, mode);
}
- void TearDown() override {
- remove(file_name_.c_str());
- }
+ void TearDown() override { remove(file_name_.c_str()); }
std::string file_name_;
WriteFile write_file_;
@@ -93,9 +91,11 @@ class TsFileWriterTableTest : public ::testing::Test {
}
static storage::Tablet gen_tablet(TableSchema* table_schema, int offset,
- int device_num, int
num_timestamp_per_device = 10) {
+ int device_num,
+ int num_timestamp_per_device = 10) {
storage::Tablet tablet(table_schema->get_measurement_names(),
- table_schema->get_data_types(), device_num *
num_timestamp_per_device);
+ table_schema->get_data_types(),
+ device_num * num_timestamp_per_device);
char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
@@ -141,8 +141,8 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) {
TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) {
auto table_schema = gen_table_schema(0);
- auto tsfile_table_writer_ =
- std::make_shared<TsFileTableWriter>(&write_file_, table_schema, 2 *
1024);
+ auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
+ &write_file_, table_schema, 2 * 1024);
for (int i = 0; i < 100; i++) {
auto tablet = gen_tablet(table_schema, i * 10000, 1, 10000);
ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
@@ -194,13 +194,65 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) {
TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) {
auto table_schema = gen_table_schema(0);
- auto tsfile_table_writer_ =
- std::make_shared<TsFileTableWriter>(&write_file_, table_schema, 256 *
1024 * 1024);
- ASSERT_EQ(common::g_config_value_.chunk_group_size_threshold_, 256 * 1024
* 1024);
+ auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
+ &write_file_, table_schema, 256 * 1024 * 1024);
+ ASSERT_EQ(common::g_config_value_.chunk_group_size_threshold_,
+ 256 * 1024 * 1024);
tsfile_table_writer_->close();
delete table_schema;
}
+TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
+ auto table_schema = gen_table_schema(0);
+ auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
+ &write_file_, table_schema, 256 * 1024 * 1024);
+ int device_num = 3;
+ int num_timestamp_per_device = 10;
+ int offset = 0;
+ auto datatypes = table_schema->get_data_types();
+
+ datatypes[6] = TSDataType::INT32;
+ storage::Tablet tablet(table_schema->get_measurement_names(), datatypes,
+ device_num * num_timestamp_per_device);
+
+ char* literal = new char[std::strlen("device_id") + 1];
+ std::strcpy(literal, "device_id");
+ String literal_str(literal, std::strlen("device_id"));
+ for (int i = 0; i < device_num; i++) {
+ for (int l = 0; l < num_timestamp_per_device; l++) {
+ int row_index = i * num_timestamp_per_device + l;
+ tablet.add_timestamp(row_index, offset + l);
+ auto column_schemas = table_schema->get_measurement_schemas();
+ for (int idx = 0; idx < column_schemas.size(); idx++) {
+ switch (datatypes[idx]) {
+ case TSDataType::INT64:
+ tablet.add_value(row_index,
+
column_schemas[idx]->measurement_name_,
+ static_cast<int64_t>(i));
+ break;
+ case TSDataType::INT32:
+ tablet.add_value(row_index,
+
column_schemas[idx]->measurement_name_,
+ static_cast<int32_t>(i));
+ break;
+ case TSDataType::STRING:
+ tablet.add_value(row_index,
+
column_schemas[idx]->measurement_name_,
+ literal_str);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+ delete[] literal;
+ delete table_schema;
+
+ ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_table_writer_->write_table(tablet));
+ tsfile_table_writer_->close();
+}
+
TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index bd4a4793..6f58f0cb 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -110,7 +110,7 @@ class TsFileWriterTest : public ::testing::Test {
}
};
-class TsFileWriterTestSimple : public ::testing::Test{};
+class TsFileWriterTestSimple : public ::testing::Test {};
TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) {
TsFileWriter writer;
@@ -229,7 +229,6 @@ TEST_F(TsFileWriterTest, RegisterTimeSeries) {
E_OK);
ASSERT_EQ(tsfile_writer_->flush(), E_OK);
ASSERT_EQ(tsfile_writer_->close(), E_OK);
-
}
TEST_F(TsFileWriterTest, WriteMultipleRecords) {
@@ -911,4 +910,51 @@ TEST_F(TsFileWriterTest, WriteAlignedPartialData) {
cur_row++;
} while (true);
reader.destroy_query_data_set(qds);
+}
+
+TEST_F(TsFileWriterTest, WriteTabletDataTypeMismatch) {
+ for (int i = 0; i < 2; i++) {
+ std::string device_name = "test_device" + std::to_string(i);
+ for (int j = 0; j < 3; j++) {
+ std::string measure_name = "measurement" + std::to_string(j);
+ tsfile_writer_->register_timeseries(
+ device_name, storage::MeasurementSchema(
+ measure_name, common::TSDataType::INT32,
+ common::TSEncoding::PLAIN,
+ common::CompressionType::UNCOMPRESSED));
+ }
+ }
+
+ std::vector<TSDataType> measurement_types{
+ TSDataType::INT32, TSDataType::INT64, TSDataType::INT32};
+ std::vector<std::string> measurement_names{"measurement0", "measurement1",
+ "measurement2"};
+
+ Tablet tablet("test_device0", &measurement_names, &measurement_types);
+ for (int row = 0; row < 100; row++) {
+ tablet.add_timestamp(row, row);
+ for (int col = 0; col < 3; col++) {
+ switch (measurement_types[col]) {
+ case TSDataType::INT32:
+ tablet.add_value(row, col, static_cast<int32_t>(row));
+ break;
+ case TSDataType::INT64:
+ tablet.add_value(row, col, static_cast<int64_t>(row));
+ break;
+ default:;
+ }
+ }
+ }
+ ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet(tablet));
+ std::vector<MeasurementSchema *> measurement_schemas;
+ for (int i = 0; i < 3; i++) {
+ measurement_schemas.push_back(new MeasurementSchema(
+ "measurement" + std::to_string(i), TSDataType::INT32));
+ }
+
+ tsfile_writer_->register_aligned_timeseries("device3",
measurement_schemas);
+ tablet.set_table_name("device3");
+ ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet_aligned(tablet));
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
\ No newline at end of file