This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 826b113fd719369955079c96f968a3be4d0b9dab Author: Gabor Kaszab <[email protected]> AuthorDate: Tue Mar 21 09:51:30 2023 +0100 IMPALA-11954: Fix for URL encoded partition columns for Iceberg tables There is a bug when an Iceberg table has a string partition column and Impala insert special chars into this column that need to be URL encoded. In this case the partition name is URL encoded not to confuse the file paths for that partition. E.g. 'b=1/2' value is converted to 'b=1%2F2'. This if fine for path creation, however, for Iceberg tables the same URL encoded partition name is saved into catalog as the partition name also used for Iceberg column stats. This brings to incorrect results when querying the table as the URL encoded values are returned in a SELECT * query instead of what the user inserted. Additionally, when adding a filter to the query, Iceberg will filter out all the rows because it compares the non-encoded values to the URL encoded values. Testing: - Added new tests to iceberg-partitioned-insert.test to cover this scenario. - Re-run the existing test suite. Change-Id: I67edc3d04738306fed0d4ebc5312f3d8d4f14254 Reviewed-on: http://gerrit.cloudera.org:8080/19654 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-table-sink.cc | 80 +++++++++------ be/src/exec/hdfs-table-sink.h | 14 +++ be/src/exec/output-partition.h | 7 +- be/src/runtime/dml-exec-state.cc | 16 ++- common/fbs/IcebergObjects.fbs | 1 + .../impala/service/IcebergCatalogOpExecutor.java | 4 +- .../java/org/apache/impala/util/IcebergUtil.java | 23 +++-- .../QueryTest/iceberg-partitioned-insert.test | 111 +++++++++++++++++++++ 8 files changed, 204 insertions(+), 52 deletions(-) diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 37e6dcc25..355cb7479 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -508,30 +508,35 @@ string HdfsTableSink::GetPartitionName(int i) { } } -Status HdfsTableSink::InitOutputPartition(RuntimeState* state, - const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row, - OutputPartition* output_partition, bool empty_partition) { - // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/" - // etc. - stringstream partition_name_ss; +void HdfsTableSink::ConstructPartitionNames( + const TupleRow* row, + string* url_encoded_partition_name, + vector<string>* raw_partition_names, + string* external_partition_name) { + DCHECK(url_encoded_partition_name != nullptr); + DCHECK(external_partition_name != nullptr); + DCHECK(raw_partition_names != nullptr); + DCHECK(raw_partition_names->empty()); + + stringstream url_encoded_partition_name_ss; stringstream external_partition_name_ss; - for (int j = 0; j < partition_key_expr_evals_.size(); ++j) { - bool is_external_part = HasExternalOutputDir() && - j >= external_output_partition_depth_; - if (is_external_part) { - external_partition_name_ss << GetPartitionName(j) << "="; - } - partition_name_ss << GetPartitionName(j) << "="; - void* value = partition_key_expr_evals_[j]->GetValue(row); - // nullptr partition keys get a special value to be compatible with Hive. + for (int i = 0; i < partition_key_expr_evals_.size(); ++i) { + stringstream raw_partition_key_value_ss; + stringstream encoded_partition_key_value_ss; + + raw_partition_key_value_ss << GetPartitionName(i) << "="; + encoded_partition_key_value_ss << GetPartitionName(i) << "="; + + void* value = partition_key_expr_evals_[i]->GetValue(row); if (value == nullptr) { - partition_name_ss << table_desc_->null_partition_key_value(); - if (is_external_part) { - external_partition_name_ss << table_desc_->null_partition_key_value(); - } + raw_partition_key_value_ss << table_desc_->null_partition_key_value(); + encoded_partition_key_value_ss << table_desc_->null_partition_key_value(); } else { string value_str; - partition_key_expr_evals_[j]->PrintValue(value, &value_str); + partition_key_expr_evals_[i]->PrintValue(value, &value_str); + + raw_partition_key_value_ss << value_str; + // Directory names containing partition-key values need to be UrlEncoded, in // particular to avoid problems when '/' is part of the key value (which might // occur, for example, with date strings). Hive will URL decode the value @@ -545,23 +550,32 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, string part_key_value = (encoded_str.empty() ? table_desc_->null_partition_key_value() : encoded_str); // If the string is empty, map it to nullptr (mimicking Hive's behaviour) - partition_name_ss << part_key_value; - if (is_external_part) { - external_partition_name_ss << part_key_value; - } + encoded_partition_key_value_ss << part_key_value; } - if (j < partition_key_expr_evals_.size() - 1) { - partition_name_ss << "/"; - if (is_external_part) { - external_partition_name_ss << "/"; - } + if (i < partition_key_expr_evals_.size() - 1) encoded_partition_key_value_ss << "/"; + + url_encoded_partition_name_ss << encoded_partition_key_value_ss.str(); + if (HasExternalOutputDir() && i >= external_output_partition_depth_) { + external_partition_name_ss << encoded_partition_key_value_ss.str(); } + + raw_partition_names->push_back(raw_partition_key_value_ss.str()); } - // partition_name_ss now holds the unique descriptor for this partition, - output_partition->partition_name = partition_name_ss.str(); - BuildHdfsFileNames(partition_descriptor, output_partition, - external_partition_name_ss.str()); + *url_encoded_partition_name = url_encoded_partition_name_ss.str(); + *external_partition_name = external_partition_name_ss.str(); +} + +Status HdfsTableSink::InitOutputPartition(RuntimeState* state, + const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row, + OutputPartition* output_partition, bool empty_partition) { + // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/" + // etc. + string external_partition_name; + ConstructPartitionNames(row, &output_partition->partition_name, + &output_partition->raw_partition_names, &external_partition_name); + + BuildHdfsFileNames(partition_descriptor, output_partition, external_partition_name); if (ShouldSkipStaging(state, output_partition)) { // We will be writing to the final file if we're skipping staging, so get a connection diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 13d44866e..5aa637949 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -140,6 +140,20 @@ class HdfsTableSink : public DataSink { const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row, OutputPartition* output_partition, bool empty_partition) WARN_UNUSED_RESULT; + /// Constructs the partition name using 'partition_key_expr_evals_'. + /// 'url_encoded_partition_name' is the full partition name in URL encoded form. E.g.: + /// it's "a=12%2F31%2F11/b=10" if we have 2 partition columns "a" and "b", and "a" has + /// the value of "12/31/11" and "b" has the value of 10. Since this is URL encoded, + /// can be used for paths. + /// 'raw_partition_name' is a vector of partition key-values in a non-encoded format. + /// Staying with the above example this would hold ["a=12/31/11", "b=10"]. + /// 'external_partition_name' is a subset of 'url_encoded_partition_name'. + void ConstructPartitionNames( + const TupleRow* row, + string* url_encoded_partition_name, + std::vector<std::string>* raw_partition_names, + string* external_partition_name); + /// Add a temporary file to an output partition. Files are created in a /// temporary directory and then moved to the real partition directory by the /// coordinator in a finalization step. The temporary file's current location diff --git a/be/src/exec/output-partition.h b/be/src/exec/output-partition.h index 821b6a893..38f66645b 100644 --- a/be/src/exec/output-partition.h +++ b/be/src/exec/output-partition.h @@ -63,9 +63,14 @@ struct OutputPartition { /// Path: tmp_hdfs_dir_name/partition_name/<unique_id_str> std::string tmp_hdfs_file_name_prefix; - /// key1=val1/key2=val2/ etc. Used to identify partitions to the metastore. + /// key1=val1/key2=val2/ etc. Used to identify partitions to the metastore. Note, the + /// value in this member is URL encoded for the sake of e.g. data file name creation. std::string partition_name; + /// This is a split of the 'partition_name' variable by '/'. Note, the partition keys + /// and values in this variable are not URL encoded. + std::vector<std::string> raw_partition_names; + /// Connection to hdfs. hdfsFS hdfs_connection = nullptr; diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc index 94c0ba5af..3f8c32439 100644 --- a/be/src/runtime/dml-exec-state.cc +++ b/be/src/runtime/dml-exec-state.cc @@ -500,7 +500,7 @@ createIcebergColumnStats( } string createIcebergDataFileString( - const string& partition_name, const string& final_path, int64_t num_rows, + const OutputPartition& partition, const string& final_path, int64_t num_rows, int64_t file_size, const IcebergFileStats& insert_stats) { using namespace org::apache::impala::fb; flatbuffers::FlatBufferBuilder fbb; @@ -510,13 +510,19 @@ string createIcebergDataFileString( ice_col_stats_vec.push_back(createIcebergColumnStats(fbb, it->first, it->second)); } + vector<flatbuffers::Offset<flatbuffers::String>> raw_partition_fields; + for (const string& partition_name : partition.raw_partition_names) { + raw_partition_fields.push_back(fbb.CreateString(partition_name)); + } + flatbuffers::Offset<FbIcebergDataFile> data_file = CreateFbIcebergDataFile(fbb, fbb.CreateString(final_path), // Currently we can only write Parquet to Iceberg FbIcebergDataFileFormat::FbIcebergDataFileFormat_PARQUET, num_rows, file_size, - fbb.CreateString(partition_name), + fbb.CreateString(partition.partition_name), + fbb.CreateVector(raw_partition_fields), fbb.CreateVector(ice_col_stats_vec)); fbb.Finish(data_file); return string(reinterpret_cast<char*>(fbb.GetBufferPointer()), fbb.GetSize()); @@ -527,8 +533,8 @@ string createIcebergDataFileString( void DmlExecState::AddCreatedFile(const OutputPartition& partition, bool is_iceberg, const IcebergFileStats& insert_stats) { lock_guard<mutex> l(lock_); - const string& partition_name = partition.partition_name; - PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name); + PartitionStatusMap::iterator entry = + per_partition_status_.find(partition.partition_name); DCHECK(entry != per_partition_status_.end()); DmlFileStatusPb* file = entry->second.add_created_files(); if (partition.current_file_final_name.empty()) { @@ -541,7 +547,7 @@ void DmlExecState::AddCreatedFile(const OutputPartition& partition, bool is_iceb file->set_size(partition.current_file_bytes); if (is_iceberg) { file->set_iceberg_data_file_fb( - createIcebergDataFileString(partition_name, file->final_path(), file->num_rows(), + createIcebergDataFileString(partition, file->final_path(), file->num_rows(), file->size(), insert_stats)); } } diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs index db7fac7d1..bec9547e5 100644 --- a/common/fbs/IcebergObjects.fbs +++ b/common/fbs/IcebergObjects.fbs @@ -65,6 +65,7 @@ table FbIcebergDataFile { record_count: long = 0; file_size_in_bytes: long = 0; partition_path: string; + raw_partition_fields: [string]; per_column_stats: [FbIcebergColumnStats]; } diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index a49a5b3bf..03566774c 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -351,9 +351,9 @@ public class IcebergCatalogOpExecutor { .withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(dataFile.format())) .withRecordCount(dataFile.recordCount()) .withFileSizeInBytes(dataFile.fileSizeInBytes()); - IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromPath( + IcebergUtil.PartitionData partitionData = IcebergUtil.partitionDataFromDataFile( partSpec.partitionType(), - feIcebergTable.getDefaultPartitionSpec(), dataFile.partitionPath()); + feIcebergTable.getDefaultPartitionSpec(), dataFile); if (partitionData != null) builder.withPartition(partitionData); batchWrite.addFile(builder.build()); } diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index c0dc4c3e6..fb5bdf2cd 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -85,6 +85,7 @@ import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; import org.apache.impala.fb.FbFileMetadata; +import org.apache.impala.fb.FbIcebergDataFile; import org.apache.impala.fb.FbIcebergDataFileFormat; import org.apache.impala.fb.FbIcebergMetadata; import org.apache.impala.fb.FbIcebergPartitionTransformValue; @@ -690,28 +691,28 @@ public class IcebergUtil { } /** - * Create a PartitionData object from a partition path and its descriptors. + * Create a PartitionData object using partition information from FbIcebergDataFile. */ - public static PartitionData partitionDataFromPath(Types.StructType partitionType, - IcebergPartitionSpec spec, String path) throws ImpalaRuntimeException { - if (path == null || path.isEmpty()) return null; + public static PartitionData partitionDataFromDataFile(Types.StructType partitionType, + IcebergPartitionSpec spec, FbIcebergDataFile dataFile) + throws ImpalaRuntimeException { + if (dataFile == null || dataFile.rawPartitionFieldsLength() == 0) return null; PartitionData data = new PartitionData(spec.getIcebergPartitionFieldsSize()); - String[] partitions = path.split("/", -1); int path_i = 0; for (int i = 0; i < spec.getIcebergPartitionFieldsSize(); ++i) { IcebergPartitionField field = spec.getIcebergPartitionFields().get(i); - if (field.getTransformType() == TIcebergPartitionTransformType.VOID) { - continue; - } - String[] parts = partitions[path_i].split("=", 2); + if (field.getTransformType() == TIcebergPartitionTransformType.VOID) continue; + + Preconditions.checkState(path_i < dataFile.rawPartitionFieldsLength()); + String[] parts = dataFile.rawPartitionFields(path_i).split("=", 2); Preconditions.checkArgument(parts.length == 2 && parts[0] != null && field.getFieldName().equals(parts[0]), "Invalid partition: %s", - partitions[path_i]); + dataFile.rawPartitionFields(path_i)); TIcebergPartitionTransformType transformType = field.getTransformType(); data.set(i, getPartitionValue( partitionType.fields().get(i).type(), transformType, parts[1])); - path_i += 1; + ++path_i; } return data; } diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test index 6f6abc917..faeef6e4f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test @@ -666,3 +666,114 @@ select count(*) from store_sales where ss_sold_date_sk is null; ---- TYPES BIGINT ==== +---- QUERY +# Insert into a string partition column some chars that have to be URL encoded for the path creation. +# Check that result strings are not URL encoded. +create table special_char_partitions (i int, s string, s2 string) +partitioned by spec (i, s, truncate(4, s)) +stored as iceberg; +insert into special_char_partitions + values (1, '11/14/31', '44/1'), (2, '11"14"31', '43"3'), (3, '11=14=31', '65=2'), (4, '', 'a'), (5, cast(null as string), 'b'); +select * from special_char_partitions; +---- RESULTS +1,'11/14/31','44/1' +2,'11"14"31','43"3' +3,'11=14=31','65=2' +4,'','a' +5,'NULL','b' +---- TYPES +INT,STRING,STRING +==== +---- QUERY +# Check that filtering using special chars work as expected. +select * from special_char_partitions where s = '11/14/31'; +---- RESULTS +1,'11/14/31','44/1' +---- TYPES +INT,STRING,STRING +---- RUNTIME_PROFILE +aggregation(SUM, RowsRead): 1 +aggregation(SUM, NumRowGroups): 1 +==== +---- QUERY +select * from special_char_partitions where s = ''; +---- RESULTS +4,'','a' +---- TYPES +INT,STRING,STRING +==== +---- QUERY +select * from special_char_partitions where s is NULL; +---- RESULTS +5,'NULL','b' +---- TYPES +INT,STRING,STRING +==== +---- QUERY +# Check that the file path contains URL encoded strings. +show files in special_char_partitions; +---- LABELS +Path,Size,Partition,EC Policy +---- RESULTS +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=1/s=11%2F14%2F31/s_trunc=11%2F1/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=2/s=11%2214%2231/s_trunc=11%221/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=3/s=11%3D14%3D31/s_trunc=11%3D1/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=4/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=5/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY' +---- TYPES +STRING, STRING, STRING, STRING +==== +---- QUERY +# Check that values in SHOW PARTITIONS are nor URL encoded (but simply escaped). +show partitions special_char_partitions; +---- RESULTS +'{"i":"1","s":"11\\/14\\/31","s_trunc":"11\\/1"}',1,1 +'{"i":"2","s":"11\\"14\\"31","s_trunc":"11\\"1"}',1,1 +'{"i":"3","s":"11=14=31","s_trunc":"11=1"}',1,1 +'{"i":"4","s":"","s_trunc":""}',1,1 +'{"i":"5","s":null,"s_trunc":null}',1,1 +---- TYPES +STRING,BIGINT,BIGINT +==== +---- QUERY +# Check special chars in a string partition column after partition evolution. +alter table special_char_partitions set partition spec (s2); +insert into special_char_partitions values (6, '11/22/33', '98/22'); +select * from special_char_partitions; +---- RESULTS +1,'11/14/31','44/1' +2,'11"14"31','43"3' +3,'11=14=31','65=2' +4,'','a' +5,'NULL','b' +6,'11/22/33','98/22' +---- TYPES +INT,STRING,STRING +==== +---- QUERY +# Check that the new partition column's path contains URL encoded strings. +show files in special_char_partitions; +---- LABELS +Path,Size,Partition,EC Policy +---- RESULTS +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=1/s=11%2F14%2F31/s_trunc=11%2F1/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=2/s=11%2214%2231/s_trunc=11%221/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=3/s=11%3D14%3D31/s_trunc=11%3D1/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=4/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=5/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/s2=98%2F22/.*parq','.*','','$ERASURECODE_POLICY' +---- TYPES +STRING, STRING, STRING, STRING +==== +---- QUERY +show partitions special_char_partitions; +---- RESULTS +'{"i":"1","s":"11\\/14\\/31","s_trunc":"11\\/1"}',1,1 +'{"i":"2","s":"11\\"14\\"31","s_trunc":"11\\"1"}',1,1 +'{"i":"3","s":"11=14=31","s_trunc":"11=1"}',1,1 +'{"i":"4","s":"","s_trunc":""}',1,1 +'{"i":"5","s":null,"s_trunc":null}',1,1 +'{"s2":"98\\/22"}',1,1 +---- TYPES +STRING,BIGINT,BIGINT +====
