This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6eb8ac0ebf [feature-wip][multi-catalog]Support caseSensitive field
name in file scan node (#11310)
6eb8ac0ebf is described below
commit 6eb8ac0ebf08467f9e2ea56cddd85528a0d2a124
Author: huangzhaowei <[email protected]>
AuthorDate: Fri Aug 5 08:03:16 2022 +0800
[feature-wip][multi-catalog]Support caseSensitive field name in file scan
node (#11310)
* Impl case sentive in file scan node
---
be/src/exec/arrow/arrow_reader.cpp | 20 ++++++++++++++++++--
be/src/exec/arrow/arrow_reader.h | 5 ++++-
be/src/exec/arrow/orc_reader.cpp | 10 +++++++---
be/src/exec/arrow/orc_reader.h | 2 +-
be/src/exec/arrow/parquet_reader.cpp | 11 +++++++----
be/src/exec/arrow/parquet_reader.h | 2 +-
be/src/vec/exec/file_arrow_scanner.cpp | 10 +++++++---
.../org/apache/doris/common/util/BrokerUtil.java | 11 ++++++++++-
.../java/org/apache/doris/planner/HudiScanNode.java | 2 +-
.../doris/planner/external/ExternalFileScanNode.java | 2 +-
10 files changed, 57 insertions(+), 18 deletions(-)
diff --git a/be/src/exec/arrow/arrow_reader.cpp
b/be/src/exec/arrow/arrow_reader.cpp
index 5d1785f744..a2e5f7c33e 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -30,6 +30,7 @@
#include "runtime/exec_env.h"
#include "runtime/mem_pool.h"
#include "runtime/tuple.h"
+#include "util/string_util.h"
#include "util/thrift_util.h"
namespace doris {
@@ -37,8 +38,10 @@ namespace doris {
// Broker
ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file)
- : _batch_size(batch_size),
_num_of_columns_from_file(num_of_columns_from_file) {
+ int32_t num_of_columns_from_file, bool
caseSensitive)
+ : _batch_size(batch_size),
+ _num_of_columns_from_file(num_of_columns_from_file),
+ _caseSensitive(caseSensitive) {
_arrow_file = std::shared_ptr<ArrowFile>(new ArrowFile(file_reader));
_rb_reader = nullptr;
_total_groups = 0;
@@ -81,6 +84,19 @@ Status ArrowReaderWrap::column_indices(const
std::vector<SlotDescriptor*>& tuple
return Status::OK();
}
+int ArrowReaderWrap::get_cloumn_index(std::string column_name) {
+ std::string real_column_name = _caseSensitive ? column_name :
to_lower(column_name);
+ auto iter = _map_column.find(real_column_name);
+ if (iter != _map_column.end()) {
+ return iter->second;
+ } else {
+ std::stringstream str_error;
+ str_error << "Invalid Column Name:" << real_column_name;
+ LOG(WARNING) << str_error.str();
+ return -1;
+ }
+}
+
Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
bool* eof) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 704ca0750e..159377c480 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -79,7 +79,8 @@ private:
// base of arrow reader
class ArrowReaderWrap {
public:
- ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t
num_of_columns_from_file);
+ ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t
num_of_columns_from_file,
+ bool caseSensitive);
virtual ~ArrowReaderWrap();
virtual Status init_reader(const TupleDescriptor* tuple_desc,
@@ -96,6 +97,7 @@ public:
std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
virtual Status size(int64_t* size) { return Status::NotSupported("Not
Implemented size"); }
+ int get_cloumn_index(std::string column_name);
void prefetch_batch();
@@ -124,6 +126,7 @@ protected:
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
std::thread _thread;
+ bool _caseSensitive;
};
} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 0db5640369..0156355b39 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -24,13 +24,14 @@
#include "io/file_reader.h"
#include "runtime/mem_pool.h"
#include "runtime/tuple.h"
+#include "util/string_util.h"
namespace doris {
ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file, int64_t
range_start_offset,
- int64_t range_size)
- : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
+ int64_t range_size, bool caseSensitive)
+ : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file,
caseSensitive),
_range_start_offset(range_start_offset),
_range_size(range_size) {
_reader = nullptr;
@@ -66,8 +67,11 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor*
tuple_desc,
}
std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie();
for (size_t i = 0; i < schema->num_fields(); ++i) {
+ std::string schemaName =
+ _caseSensitive ? schema->field(i)->name() :
to_lower(schema->field(i)->name());
// orc index started from 1.
- _map_column.emplace(schema->field(i)->name(), i + 1);
+
+ _map_column.emplace(schemaName, i + 1);
}
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index 1e6f0f83e6..392addfea9 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -33,7 +33,7 @@ namespace doris {
class ORCReaderWrap final : public ArrowReaderWrap {
public:
ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t
num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size);
+ int64_t range_start_offset, int64_t range_size, bool
caseSensitive = true);
~ORCReaderWrap() override = default;
Status init_reader(const TupleDescriptor* tuple_desc,
diff --git a/be/src/exec/arrow/parquet_reader.cpp
b/be/src/exec/arrow/parquet_reader.cpp
index 8d119146b4..03f6657586 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -32,14 +32,15 @@
#include "runtime/mem_pool.h"
#include "runtime/string_value.h"
#include "runtime/tuple.h"
+#include "util/string_util.h"
namespace doris {
// Broker
ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t
batch_size,
int32_t num_of_columns_from_file, int64_t
range_start_offset,
- int64_t range_size)
- : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
+ int64_t range_size, bool caseSensitive)
+ : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file,
caseSensitive),
_rows_of_group(0),
_current_line_of_group(0),
_current_line_of_batch(0),
@@ -84,12 +85,14 @@ Status ParquetReaderWrap::init_reader(const
TupleDescriptor* tuple_desc,
// map
auto* schemaDescriptor = _file_metadata->schema();
for (int i = 0; i < _file_metadata->num_columns(); ++i) {
+ std::string schemaName;
// Get the Column Reader for the boolean column
if (schemaDescriptor->Column(i)->max_definition_level() > 1) {
-
_map_column.emplace(schemaDescriptor->Column(i)->path()->ToDotVector()[0], i);
+ schemaName =
schemaDescriptor->Column(i)->path()->ToDotVector()[0];
} else {
- _map_column.emplace(schemaDescriptor->Column(i)->name(), i);
+ schemaName = schemaDescriptor->Column(i)->name();
}
+ _map_column.emplace(_caseSensitive ? schemaName :
to_lower(schemaName), i);
}
_timezone = timezone;
diff --git a/be/src/exec/arrow/parquet_reader.h
b/be/src/exec/arrow/parquet_reader.h
index 95774f60b0..d4805f8d84 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -63,7 +63,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap {
public:
// batch_size is not use here
ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t
num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size);
+ int64_t range_start_offset, int64_t range_size, bool
caseSensitive = true);
~ParquetReaderWrap() override = default;
// Read
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp
b/be/src/vec/exec/file_arrow_scanner.cpp
index e6c4fa7597..d416fb735c 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -186,7 +186,11 @@ Status FileArrowScanner::_append_batch_to_block(Block*
block) {
if (slot_desc == nullptr) {
continue;
}
- auto* array = _batch->GetColumnByName(slot_desc->col_name()).get();
+ int file_index =
_cur_file_reader->get_cloumn_index(slot_desc->col_name());
+ if (file_index == -1) {
+ continue;
+ }
+ auto* array = _batch->column(file_index).get();
auto& column_with_type_and_name =
block->get_by_name(slot_desc->col_name());
RETURN_IF_ERROR(arrow_column_to_doris_column(
array, _arrow_batch_cur_idx, column_with_type_and_name.column,
@@ -228,7 +232,7 @@ ArrowReaderWrap*
VFileParquetScanner::_new_arrow_reader(FileReader* file_reader,
int64_t
range_start_offset,
int64_t range_size) {
return new ParquetReaderWrap(file_reader, batch_size,
num_of_columns_from_file,
- range_start_offset, range_size);
+ range_start_offset, range_size, false);
}
void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
@@ -252,7 +256,7 @@ ArrowReaderWrap*
VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int
int64_t range_start_offset,
int64_t range_size) {
return new ORCReaderWrap(file_reader, batch_size,
num_of_columns_from_file, range_start_offset,
- range_size);
+ range_size, false);
}
} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index be7fa7c63f..0db7a16df0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -222,6 +222,14 @@ public class BrokerUtil {
public static List<String> parseColumnsFromPath(String filePath,
List<String> columnsFromPath)
throws UserException {
+ return parseColumnsFromPath(filePath, columnsFromPath, true);
+ }
+
+ public static List<String> parseColumnsFromPath(
+ String filePath,
+ List<String> columnsFromPath,
+ boolean caseSensitive)
+ throws UserException {
if (columnsFromPath == null || columnsFromPath.isEmpty()) {
return Collections.emptyList();
}
@@ -246,7 +254,8 @@ public class BrokerUtil {
throw new UserException("Fail to parse columnsFromPath,
expected: "
+ columnsFromPath + ", filePath: " + filePath);
}
- int index = columnsFromPath.indexOf(pair[0]);
+ String parsedColumnName = caseSensitive ? pair[0] :
pair[0].toLowerCase();
+ int index = columnsFromPath.indexOf(parsedColumnName);
if (index == -1) {
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
index dab3a9bfcd..93ab58afe1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java
@@ -274,7 +274,7 @@ public class HudiScanNode extends BrokerScanNode {
TScanRangeLocations curLocations = newLocations(context.params,
brokerDesc);
List<String> partitionValuesFromPath =
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
- getPartitionKeys());
+ getPartitionKeys(), false);
int numberOfColumnsFromFile = context.slotDescByName.size() -
partitionValuesFromPath.size();
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit,
fileFormatType,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 1ae05ff630..2984352066 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -309,7 +309,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
totalFileSize += split.getLength();
List<String> partitionValuesFromPath =
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
- partitionKeys);
+ partitionKeys, false);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]