This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 07a5716773d410b84be4f97ef5bede2f65b81546 Author: Joe McDonnell <joemcdonn...@cloudera.com> AuthorDate: Sun Jun 22 17:05:22 2025 -0700 IMPALA-13892: Add support for printing STRUCTs Tuple cache correctness verification is failing as the code in debug-util.cc used for printing the text version of tuples does not support printing structs. It hits a DCHECK and kills Impala. This adds supports for printing structs to debug-util.cc, fixing tuple cache correctness verification for complex types. To print structs correctly, each slot needs to know its field name. The ColumnType has this information, but it requires a field idx to lookup the name. This is the last index in the absolute path for this slot. However, the materialized path can be truncated to remove some indices at the end. Since we need that information to resolve the field name, this adds the struct field idx to the TSlotDescriptor to pass it to the backend. This also adds a counter to the profile to track when correctness verification is on. This is useful for testing. Testing: - Added a custom cluster test using nested types with correctness verification - Examined some of the text files Change-Id: Ib9479754c2766a9dd6483ba065e26a4d3a22e7e9 Reviewed-on: http://gerrit.cloudera.org:8080/23075 Reviewed-by: Michael Smith <michael.sm...@cloudera.com> Reviewed-by: Daniel Becker <daniel.bec...@cloudera.com> Tested-by: Joe McDonnell <joemcdonn...@cloudera.com> --- be/src/exec/tuple-cache-node.cc | 10 +- be/src/exec/tuple-cache-node.h | 2 + be/src/runtime/descriptors.cc | 3 + be/src/runtime/descriptors.h | 9 ++ be/src/runtime/raw-value.cc | 12 +- be/src/runtime/row-batch.h | 9 +- be/src/util/debug-util.cc | 165 +++++++++++++++------ be/src/util/debug-util.h | 13 +- common/thrift/Descriptors.thrift | 5 + .../org/apache/impala/analysis/SlotDescriptor.java | 9 ++ tests/custom_cluster/test_tuple_cache.py | 31 +++- 11 files changed, 215 insertions(+), 53 deletions(-) diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc index 455245f6b..d846c1a1d 100644 --- a/be/src/exec/tuple-cache-node.cc +++ b/be/src/exec/tuple-cache-node.cc @@ -65,7 +65,12 @@ Status TupleCacheNode::Prepare(RuntimeState* state) { ADD_COUNTER(runtime_profile(), "NumTupleCacheBackpressureHalted", TUnit::UNIT); num_skipped_counter_ = ADD_COUNTER(runtime_profile(), "NumTupleCacheSkipped", TUnit::UNIT); - + // If correctness verification is enabled, add a counter to indicate whether it was + // actually performing correctness verification. + if (TupleCacheVerificationEnabled(state)) { + num_correctness_verification_counter_ = + ADD_COUNTER(runtime_profile(), "NumTupleCacheCorrectnessVerification", TUnit::UNIT); + } // Compute the combined cache key by computing the fragment instance key and // fusing it with the compile time key. ComputeFragmentInstanceKey(state); @@ -96,6 +101,9 @@ Status TupleCacheNode::Open(RuntimeState* state) { // If the node is marked to skip correctness verification, we don't want to read // from the cache as that would prevent its children from executing. if (!skip_correctness_verification_) { + VLOG_FILE << "Tuple Cache: correctness verification for " << combined_key_; + DCHECK(num_correctness_verification_counter_ != nullptr); + COUNTER_ADD(num_correctness_verification_counter_, 1); // We need the original fragment id to construct the path for the reference debug // cache file. If it's missing from the metadata, we return an error status // immediately. diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h index 51a90bc0c..4f84d1b0e 100644 --- a/be/src/exec/tuple-cache-node.h +++ b/be/src/exec/tuple-cache-node.h @@ -79,6 +79,8 @@ private: RuntimeProfile::Counter* num_backpressure_halted_counter_ = nullptr; /// Number of results that skip the cache due to a tombstone RuntimeProfile::Counter* num_skipped_counter_ = nullptr; + /// Number of results that are running correctness verification + RuntimeProfile::Counter* num_correctness_verification_counter_ = nullptr; /// Whether any RowBatch from a cache file has been returned to a caller /// It is possible to recover from an error reading a cache file if no diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index 5af38fd43..0227cc88a 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -119,6 +119,9 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc, DCHECK(!tdesc.__isset.itemTupleId); DCHECK(children_tuple_descriptor == nullptr); } + if (tdesc.__isset.structFieldIdx) { + struct_field_idx_ = tdesc.structFieldIdx; + } } bool SlotDescriptor::ColPathLessThan(const SlotDescriptor* a, const SlotDescriptor* b) { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index bf18c249e..b0962b891 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -20,6 +20,7 @@ #include <cstdint> #include <iosfwd> #include <map> +#include <optional> #include <unordered_map> #include <utility> #include <vector> @@ -144,6 +145,8 @@ class SlotDescriptor { TVirtualColumnType::type virtual_column_type() const { return virtual_column_type_; } bool IsVirtual() const { return virtual_column_type_ != TVirtualColumnType::NONE; } + std::optional<int32_t> struct_field_idx() const { return struct_field_idx_; } + /// Comparison function for ordering slot descriptors by their col_path_. /// Returns true if 'a' comes before 'b'. /// Orders the paths as in a depth-first traversal of the schema tree, as follows: @@ -247,6 +250,12 @@ class SlotDescriptor { const TVirtualColumnType::type virtual_column_type_; + /// If this is a slot in a struct, this is the field id within that struct. This is + /// useful for getting the struct's field name from ColumnType. This is equivalent + /// to the final index of the absolute path, but the materialized path can be + /// truncated and omit this index. + std::optional<int32_t> struct_field_idx_; + /// 'children_tuple_descriptor' should be non-NULL iff this is a complex type slot. SlotDescriptor(const TSlotDescriptor& tdesc, const TupleDescriptor* parent, const TupleDescriptor* children_tuple_descriptor); diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc index 3e1721695..d14756fcc 100644 --- a/be/src/runtime/raw-value.cc +++ b/be/src/runtime/raw-value.cc @@ -122,9 +122,6 @@ void RawValue::PrintValue(const void* value, const ColumnType& type, int scale, case TYPE_DATE: *str = reinterpret_cast<const DateValue*>(value)->ToString(); return; - case TYPE_FIXED_UDA_INTERMEDIATE: - *str = "Intermediate UDA step, no value printed"; - return; default: break; } @@ -514,6 +511,15 @@ void RawValue::PrintValue( *stream << *reinterpret_cast<const DateValue*>(value); if (quote_val) *stream << "\""; } break; + case TYPE_FIXED_UDA_INTERMEDIATE: { + // This is always a binary type, so escape invalid unicode characters to make it + // printable. + string intermed_str(reinterpret_cast<const char*>(value), type.len); + intermed_str = strings::Utf8SafeCEscape(intermed_str); + if (quote_val) *stream << "\""; + stream->write(intermed_str.c_str(), intermed_str.size()); + if (quote_val) *stream << "\""; + } break; default: DCHECK(false) << "Unknown type: " << type; } stream->precision(old_precision); diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 3dd865924..7e0f8a0fe 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -161,12 +161,19 @@ class RowBatch { } TupleRow* ALWAYS_INLINE GetRow(int row_idx) { - DCHECK(tuple_ptrs_ != NULL); + DCHECK(tuple_ptrs_ != nullptr); DCHECK_GE(row_idx, 0); DCHECK_LT(row_idx, capacity_); return reinterpret_cast<TupleRow*>(tuple_ptrs_ + row_idx * num_tuples_per_row_); } + const TupleRow* ALWAYS_INLINE GetRow(int row_idx) const { + DCHECK(tuple_ptrs_ != nullptr); + DCHECK_GE(row_idx, 0); + DCHECK_LT(row_idx, capacity_); + return reinterpret_cast<const TupleRow*>(tuple_ptrs_ + row_idx * num_tuples_per_row_); + } + /// An iterator for going through a row batch, starting at 'row_idx'. /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit' /// or the last row, whichever comes first. Otherwise, it will iterate till the last diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index c7f629548..33db163ca 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -113,11 +113,11 @@ bool ParseId(const string& s, TUniqueId* id) { // For backwards compatibility, this method parses two forms of query ID from text: // - <hex-int64_t><colon><hex-int64_t> - this format is the standard going forward // - <decimal-int64_t><space><decimal-int64_t> - legacy compatibility with CDH4 CM - DCHECK(id != NULL); + DCHECK(id != nullptr); const char* hi_part = s.c_str(); char* separator = const_cast<char*>(strchr(hi_part, ':')); - if (separator == NULL) { + if (separator == nullptr) { // Legacy compatibility branch char_separator<char> sep(" "); tokenizer< char_separator<char>> tokens(s, sep); @@ -144,8 +144,8 @@ bool ParseId(const string& s, TUniqueId* id) { const char* lo_part = separator + 1; *separator = '\0'; - char* error_hi = NULL; - char* error_lo = NULL; + char* error_hi = nullptr; + char* error_lo = nullptr; id->hi = strtoul(hi_part, &error_hi, 16); id->lo = strtoul(lo_part, &error_lo, 16); @@ -154,65 +154,140 @@ bool ParseId(const string& s, TUniqueId* id) { return valid; } -string PrintTuple(const Tuple* t, const TupleDescriptor& d) { - if (t == NULL) return "null"; - stringstream out; - out << "("; - bool first_value = true; - for (int i = 0; i < d.slots().size(); ++i) { - SlotDescriptor* slot_d = d.slots()[i]; - if (first_value) { - first_value = false; - } else { - out << " "; +// Forward-declarations to keep the compiler happy +static void PrintCollection(const Tuple* t, const SlotDescriptor& slot_d, + stringstream* out); +static void PrintStruct(const Tuple* t, const SlotDescriptor& slot_d, stringstream* out); + +// Generic recursive dispatch to print a slot descriptor +static void PrintSlot(const Tuple* t, const SlotDescriptor& slot_d, stringstream* out) { + if (t->IsNull(slot_d.null_indicator_offset())) { + *out << "null"; + } else if (slot_d.type().IsCollectionType()) { + PrintCollection(t, slot_d, out); + } else if (slot_d.type().IsStructType()) { + PrintStruct(t, slot_d, out); + } else { + RawValue::PrintValue(t->GetSlot(slot_d.tuple_offset()), slot_d.type(), -1, out, + /*quote_val*/ true); + } +} + +// Print a collection (array or map), recursing through the children. +// Example: [(a 1) (b 2)] +static void PrintCollection(const Tuple* t, const SlotDescriptor& slot_d, + stringstream* out) { + DCHECK(t != nullptr); + DCHECK(slot_d.type().IsCollectionType()); + const TupleDescriptor* child_tuple_d = slot_d.children_tuple_descriptor(); + const CollectionValue* coll_value = + reinterpret_cast<const CollectionValue*>(t->GetSlot(slot_d.tuple_offset())); + uint8_t* coll_buf = coll_value->ptr; + *out << "["; + for (int j = 0; j < coll_value->num_tuples; ++j) { + PrintTuple(reinterpret_cast<Tuple*>(coll_buf), *child_tuple_d, out); + coll_buf += child_tuple_d->byte_size(); + } + *out << "]"; +} + +// Print a struct with field names, recursing to children. This is not intended to work +// with impala_udf::StructVal. Instead, it works with the representation of structs +// that is directly inside another tuple. +// Example: {a:1,b:2} +static void PrintStruct(const Tuple* t, const SlotDescriptor& slot_d, stringstream* out) { + DCHECK(t != nullptr); + DCHECK(slot_d.type().IsStructType()); + const TupleDescriptor* child_tuple_d = slot_d.children_tuple_descriptor(); + DCHECK(child_tuple_d != nullptr); + const ColumnType& struct_type = slot_d.type(); + const std::vector<SlotDescriptor*>& child_slots = child_tuple_d->slots(); + // The struct may have fields that are not being materialized, so the child_slots may + // be a subset of the actual struct type. + DCHECK_LE(child_slots.size(), struct_type.children.size()); + *out << "{"; + for (int i = 0; i < child_slots.size(); ++i) { + if (i != 0) { + *out << ","; } - if (t->IsNull(slot_d->null_indicator_offset())) { - out << "null"; - } else if (slot_d->type().IsCollectionType()) { - const TupleDescriptor* item_d = slot_d->children_tuple_descriptor(); - const CollectionValue* coll_value = - reinterpret_cast<const CollectionValue*>(t->GetSlot(slot_d->tuple_offset())); - uint8_t* coll_buf = coll_value->ptr; - out << "["; - for (int j = 0; j < coll_value->num_tuples; ++j) { - out << PrintTuple(reinterpret_cast<Tuple*>(coll_buf), *item_d); - coll_buf += item_d->byte_size(); + const SlotDescriptor& child_slot_desc = *child_slots[i]; + // To find the struct field name, we need the index into the ColumnType's + // field_names. Structs can be partially materialized, so the slot's index + // is not correct. The struct_field_idx contains the appropriate index into + // the field_names. + if (child_slot_desc.struct_field_idx().has_value()) { + int struct_field_idx = child_slot_desc.struct_field_idx().value(); + if (struct_field_idx < struct_type.field_names.size()) { + *out << struct_type.field_names[struct_field_idx] << ":"; + } else { + // This is invalid. Assert on debug builds, but otherwise handle it for + // release builds. + DCHECK_LT(struct_field_idx, struct_type.field_names.size()); + *out << "invalid" << i << ":"; } - out << "]"; } else { - string value_str; - RawValue::PrintValue( - t->GetSlot(slot_d->tuple_offset()), slot_d->type(), -1, &value_str); - out << value_str; + *out << "unnamed" << i << ":"; } + PrintSlot(t, child_slot_desc, out); } - out << ")"; - return out.str(); + *out << "}"; } -string PrintRow(TupleRow* row, const RowDescriptor& d) { - stringstream out; - out << "["; - for (int i = 0; i < d.tuple_descriptors().size(); ++i) { - if (i != 0) out << " "; - out << PrintTuple(row->GetTuple(i), *d.tuple_descriptors()[i]); +void PrintTuple(const Tuple* t, const TupleDescriptor& tuple_d, stringstream* out) { + // The whole tuple is null + if (t == nullptr) { + *out << "null"; + return; + } + *out << "("; + for (int i = 0; i < tuple_d.slots().size(); ++i) { + if (i != 0) { + *out << " "; + } + const SlotDescriptor& slot_d = *tuple_d.slots()[i]; + PrintSlot(t, slot_d, out); } - out << "]"; + *out << ")"; +} + +string PrintTuple(const Tuple* t, const TupleDescriptor& tuple_d) { + stringstream out; + PrintTuple(t, tuple_d, &out); return out.str(); } -string PrintBatch(RowBatch* batch) { +void PrintRow(const TupleRow* row, const RowDescriptor& row_d, stringstream* out) { + *out << "["; + for (int i = 0; i < row_d.tuple_descriptors().size(); ++i) { + if (i != 0) *out << " "; + PrintTuple(row->GetTuple(i), *row_d.tuple_descriptors()[i], out); + } + *out << "]"; +} + +string PrintRow(const TupleRow* row, const RowDescriptor& row_d) { stringstream out; + PrintRow(row, row_d, &out); + return out.str(); +} + +void PrintBatch(const RowBatch* batch, stringstream* out) { for (int i = 0; i < batch->num_rows(); ++i) { - out << PrintRow(batch->GetRow(i), *batch->row_desc()) << "\n"; + PrintRow(batch->GetRow(i), *batch->row_desc(), out); + *out << "\n"; } +} + +string PrintBatch(const RowBatch* batch) { + stringstream out; + PrintBatch(batch, &out); return out.str(); } string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path) { stringstream ss; ss << tbl_desc.database() << "." << tbl_desc.name(); - const ColumnType* type = NULL; + const ColumnType* type = nullptr; if (path.size() > 0) { ss << "." << tbl_desc.col_descs()[path[0]].name(); type = &tbl_desc.col_descs()[path[0]].type(); @@ -227,7 +302,7 @@ string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path) { } else { DCHECK_EQ(path[i], 1); ss << "pos"; - type = NULL; + type = nullptr; } break; case TYPE_MAP: @@ -240,7 +315,7 @@ string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path) { } else { DCHECK_EQ(path[i], 2); ss << "pos"; - type = NULL; + type = nullptr; } break; case TYPE_STRUCT: diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h index 396fa2083..af27fb868 100644 --- a/be/src/util/debug-util.h +++ b/be/src/util/debug-util.h @@ -58,9 +58,18 @@ std::string PrintValue(const T& value) { return to_string(value); } +// Print functions have two signatures. One writes output to a stringstream passed in +// as an argument. The second returns a string. In all cases, the second signature is +// implemented by passing a stringstream into the first signature and returning the +// resulting string. If the caller is going to write the output to a stringstream, the +// stringstream signature avoids an extra copy. +void PrintTuple(const Tuple* t, const TupleDescriptor& d, std::stringstream* out); std::string PrintTuple(const Tuple* t, const TupleDescriptor& d); -std::string PrintRow(TupleRow* row, const RowDescriptor& d); -std::string PrintBatch(RowBatch* batch); +void PrintRow(const TupleRow* row, const RowDescriptor& d, std::stringstream* out); +std::string PrintRow(const TupleRow* row, const RowDescriptor& d); +void PrintBatch(const RowBatch* batch, std::stringstream* out); +std::string PrintBatch(const RowBatch* batch); + /// Converts id to a string representation. If necessary, the gdb equivalent is: /// printf "%lx:%lx\n", id.hi, id.lo std::string PrintId(const TUniqueId& id, const std::string& separator = ":"); diff --git a/common/thrift/Descriptors.thrift b/common/thrift/Descriptors.thrift index 566ba58fb..12633126e 100644 --- a/common/thrift/Descriptors.thrift +++ b/common/thrift/Descriptors.thrift @@ -58,6 +58,11 @@ struct TSlotDescriptor { // provided by other structures for the executor, so it only needs to be set for // the tuple cache. 11: optional string path + // If this is in a struct, this is the index of the field within that struct. This + // corresponds to the final entry in the absolute path. The materialized path is + // sometimes truncated, so it may not contain this information. This value is not + // interesting if this slot is not inside a struct. + 12: optional i32 structFieldIdx } struct TColumnDescriptor { diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java index d3a61f646..7b7b10c7c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java @@ -491,6 +491,15 @@ public class SlotDescriptor { // not interesting to the tuple cache. result.setPath(label_); } + // In order to print struct tuples, we need to be able to retrieve the field name for + // a slot within a struct. The last index of the absolute path is the appropriate + // value. However, the materialized path can be truncated to omit the last few + // indices. This sets the struct field idx to supply the last index. + if (isScanSlot() && parent_.getPath() != null) { + Preconditions.checkState(path_.isResolved()); + List<Integer> absolutePath = path_.getAbsolutePath(); + result.setStructFieldIdx(absolutePath.get(absolutePath.size()-1)); + } if (itemTupleDesc_ != null) { // Check for recursive or otherwise invalid item tuple descriptors. Since we assign // tuple ids globally in increasing order, the id of an item tuple descriptor must diff --git a/tests/custom_cluster/test_tuple_cache.py b/tests/custom_cluster/test_tuple_cache.py index 4df907b97..cd3e48ce6 100644 --- a/tests/custom_cluster/test_tuple_cache.py +++ b/tests/custom_cluster/test_tuple_cache.py @@ -31,10 +31,12 @@ from tests.util.parse_util import ( match_memory_estimate, parse_mem_to_mb, match_cache_key) TABLE_LAYOUT = 'name STRING, age INT, address STRING' -CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2" +CACHE_START_ARGS = \ + "--tuple_cache_dir=/tmp --tuple_cache_debug_dump_dir=/tmp --log_level=2" NUM_HITS = 'NumTupleCacheHits' NUM_HALTED = 'NumTupleCacheHalted' NUM_SKIPPED = 'NumTupleCacheSkipped' +NUM_CORRECTNESS_VERIFICATION = 'NumTupleCacheCorrectnessVerification' # Indenation used for TUPLE_CACHE_NODE in specific fragments (not averaged fragment). NODE_INDENT = ' - ' @@ -440,6 +442,33 @@ class TestTupleCacheSingle(TestTupleCacheBase): self.clone_table('functional_json.binary_tbl', test_tbl, False, vector) self.run_test_case('QueryTest/json-binary-format', vector, unique_database) + def test_complex_types_verification(self, vector): + """Run with correctness verification and check that it works with a query that + selects complex types.""" + # We use custom query options to turn on verification. We also need to use + # expand_complex_types=true so that * includes columns with complex types + custom_options = dict(vector.get_value('exec_option')) + custom_options['enable_tuple_cache_verification'] = 'true' + custom_options['expand_complex_types'] = 'true' + + # functional_parquet.complextypestbl has multiple columns with different types + # of complex types. e.g. nested_struct is a struct with multiple nested fields. + query = "select * from functional_parquet.complextypestbl" + result1 = self.execute_query(query, query_options=custom_options) + assert result1.success + assertCounters(result1.runtime_profile, 0, 0, 0) + + # The second run is when correctness verification kicks in and tests the printing + # logic. + result2 = self.execute_query(query, query_options=custom_options) + assert result2.success + # The regular counters see this as skip + assertCounters(result2.runtime_profile, 0, 0, 1) + assertCounter(result2.runtime_profile, NUM_CORRECTNESS_VERIFICATION, 1, 1) + # Order by is currently not supported with complex types results, so sort the results + # before comparing them. + assert sorted(result1.data) == sorted(result2.data) + @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS) class TestTupleCacheCluster(TestTupleCacheBase):