IMPALA-2399: Memory limit checks for all scanners. This change replaces all instances of MemPool::Allocate() in avro, text, hbase scanners with MemPool::TryAllocate().
HdfsAvroScanner::MaterializeTuple() has been converted to return a boolean in case of memory allocation failure. The codegen'ed version of MaterializeTuple() will also return a boolean. In the future, we should consider returning Status directly but that will be more involved and best left as a separate change. Change-Id: I3e5a56501967a58513888917db5ce66dec4fb5ce Reviewed-on: http://gerrit.cloudera.org:8080/2568 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/70502942 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/70502942 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/70502942 Branch: refs/heads/master Commit: 7050294215cdd81963aa815f69a573421a05ab3e Parents: 0c4dc96 Author: Michael Ho <[email protected]> Authored: Thu Feb 11 15:32:04 2016 -0800 Committer: Tim Armstrong <[email protected]> Committed: Tue Apr 12 14:02:35 2016 -0700 ---------------------------------------------------------------------- be/src/exec/base-sequence-scanner.cc | 3 +- be/src/exec/data-source-scan-node.cc | 16 +++- be/src/exec/hbase-table-scanner.cc | 52 +++++++++--- be/src/exec/hbase-table-scanner.h | 14 +-- be/src/exec/hdfs-avro-scanner-ir.cc | 36 +++++--- be/src/exec/hdfs-avro-scanner.cc | 136 +++++++++++++++++++++--------- be/src/exec/hdfs-avro-scanner.h | 36 +++++--- be/src/exec/hdfs-parquet-scanner.cc | 11 +-- be/src/exec/hdfs-scanner.cc | 8 +- be/src/exec/hdfs-sequence-scanner.cc | 4 + be/src/exec/hdfs-text-scanner.cc | 21 +++-- be/src/exec/hdfs-text-scanner.h | 11 +-- be/src/exec/scanner-context.cc | 10 +-- be/src/exec/text-converter.h | 2 +- be/src/exec/text-converter.inline.h | 15 +++- be/src/runtime/string-buffer.h | 38 ++++++--- 16 files changed, 285 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 9145403..f88717f 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -98,7 +98,8 @@ void BaseSequenceScanner::Close() { // Verify all resources (if any) have been transferred. DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(context_->num_completed_io_buffers(), 0); - if (!only_parsing_header_) { + // 'header_' can be NULL if HdfsScanNode::CreateAndPrepareScanner() failed. + if (!only_parsing_header_ && header_ != NULL) { scan_node_->RangeComplete(file_format(), header_->compression_type); } HdfsScanner::Close(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/data-source-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc index c865234..2c52c11 100644 --- a/be/src/exec/data-source-scan-node.cc +++ b/be/src/exec/data-source-scan-node.cc @@ -55,6 +55,8 @@ const string ERROR_INVALID_TIMESTAMP = "Data source returned invalid timestamp d "This likely indicates a problem with the data source library."; const string ERROR_INVALID_DECIMAL = "Data source returned invalid decimal data. " "This likely indicates a problem with the data source library."; +const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to allocate " + "$1 bytes for $2."; // Size of an encoded TIMESTAMP const size_t TIMESTAMP_SIZE = sizeof(int64_t) + sizeof(int32_t); @@ -210,7 +212,12 @@ Status DataSourceScanNode::MaterializeNextRow(MemPool* tuple_pool) { } const string& val = col.string_vals[val_idx]; size_t val_size = val.size(); - char* buffer = reinterpret_cast<char*>(tuple_pool->Allocate(val_size)); + char* buffer = reinterpret_cast<char*>(tuple_pool->TryAllocate(val_size)); + if (UNLIKELY(buffer == NULL)) { + string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", + val_size, "string slot"); + return tuple_pool->mem_tracker()->MemLimitExceeded(NULL, details, val_size); + } memcpy(buffer, val.data(), val_size); reinterpret_cast<StringValue*>(slot)->ptr = buffer; reinterpret_cast<StringValue*>(slot)->len = val_size; @@ -300,7 +307,12 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo // create new tuple buffer for row_batch MemPool* tuple_pool = row_batch->tuple_data_pool(); int tuple_buffer_size = row_batch->MaxTupleBufferSize(); - void* tuple_buffer = tuple_pool->Allocate(tuple_buffer_size); + void* tuple_buffer = tuple_pool->TryAllocate(tuple_buffer_size); + if (UNLIKELY(tuple_buffer == NULL)) { + string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "GetNext", + tuple_buffer_size, "tuple"); + return tuple_pool->mem_tracker()->MemLimitExceeded(state, details, tuple_buffer_size); + } tuple_ = reinterpret_cast<Tuple*>(tuple_buffer); ExprContext** ctxs = &conjunct_ctxs_[0]; int num_ctxs = conjunct_ctxs_.size(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hbase-table-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc index 93cad54..cd622bc 100644 --- a/be/src/exec/hbase-table-scanner.cc +++ b/be/src/exec/hbase-table-scanner.cc @@ -28,6 +28,7 @@ #include "common/names.h" using namespace impala; +using namespace strings; jclass HBaseTableScanner::scan_cl_ = NULL; jclass HBaseTableScanner::resultscanner_cl_ = NULL; @@ -70,6 +71,9 @@ jobject HBaseTableScanner::empty_row_ = NULL; jobject HBaseTableScanner::must_pass_all_op_ = NULL; jobjectArray HBaseTableScanner::compare_ops_ = NULL; +const string HBASE_MEM_LIMIT_EXCEEDED = "HBaseTableScanner::$0() failed to " + "allocate $1 bytes for $2."; + void HBaseTableScanner::ScanRange::DebugString(int indentation_level, stringstream* out) { *out << string(indentation_level * 2, ' '); @@ -573,53 +577,77 @@ inline void HBaseTableScanner::WriteTupleSlot(const SlotDescriptor* slot_desc, BitUtil::ByteSwap(slot, data, slot_desc->type().GetByteSize()); } -inline void HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell, +inline Status HBaseTableScanner::GetRowKey(JNIEnv* env, jobject cell, void** data, int* length) { int offset = env->CallIntMethod(cell, cell_get_row_offset_id_); *length = env->CallShortMethod(cell, cell_get_row_length_id_); jbyteArray jdata = (jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_); - *data = value_pool_->Allocate(*length); + *data = value_pool_->TryAllocate(*length); + if (UNLIKELY(*data == NULL)) { + string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetRowKey", + *length, "row array"); + return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length); + } env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data)); COUNTER_ADD(scan_node_->bytes_read_counter(), *length); + return Status::OK(); } -inline void HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell, +inline Status HBaseTableScanner::GetFamily(JNIEnv* env, jobject cell, void** data, int* length) { int offset = env->CallIntMethod(cell, cell_get_family_offset_id_); *length = env->CallShortMethod(cell, cell_get_family_length_id_); jbyteArray jdata = (jbyteArray) env->CallObjectMethod(cell, cell_get_family_array_); - *data = value_pool_->Allocate(*length); + *data = value_pool_->TryAllocate(*length); + if (UNLIKELY(*data == NULL)) { + string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetFamily", + *length, "family array"); + return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length); + } env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data)); COUNTER_ADD(scan_node_->bytes_read_counter(), *length); + return Status::OK(); } -inline void HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell, +inline Status HBaseTableScanner::GetQualifier(JNIEnv* env, jobject cell, void** data, int* length) { int offset = env->CallIntMethod(cell, cell_get_qualifier_offset_id_); *length = env->CallIntMethod(cell, cell_get_qualifier_length_id_); jbyteArray jdata = (jbyteArray) env->CallObjectMethod(cell, cell_get_qualifier_array_); - *data = value_pool_->Allocate(*length); + *data = value_pool_->TryAllocate(*length); + if (UNLIKELY(*data == NULL)) { + string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetQualifier", + *length, "qualifier array"); + return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length); + } env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data)); COUNTER_ADD(scan_node_->bytes_read_counter(), *length); + return Status::OK(); } -inline void HBaseTableScanner::GetValue(JNIEnv* env, jobject cell, +inline Status HBaseTableScanner::GetValue(JNIEnv* env, jobject cell, void** data, int* length) { int offset = env->CallIntMethod(cell, cell_get_value_offset_id_); *length = env->CallIntMethod(cell, cell_get_value_length_id_); jbyteArray jdata = (jbyteArray) env->CallObjectMethod(cell, cell_get_value_array_); - *data = value_pool_->Allocate(*length); + *data = value_pool_->TryAllocate(*length); + if (UNLIKELY(*data == NULL)) { + string details = Substitute(HBASE_MEM_LIMIT_EXCEEDED, "GetValue", + *length, "value array"); + return value_pool_->mem_tracker()->MemLimitExceeded(state_, details, *length); + } env->GetByteArrayRegion(jdata, offset, *length, reinterpret_cast<jbyte*>(*data)); COUNTER_ADD(scan_node_->bytes_read_counter(), *length); + return Status::OK(); } Status HBaseTableScanner::GetRowKey(JNIEnv* env, void** key, int* key_length) { jobject cell = env->GetObjectArrayElement(cells_, 0); - GetRowKey(env, cell, key, key_length); + RETURN_IF_ERROR(GetRowKey(env, cell, key, key_length)); RETURN_ERROR_IF_EXC(env); return Status::OK(); } @@ -650,7 +678,7 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family, // Check family. If it doesn't match, we have a NULL value. void* family_data; int family_length; - GetFamily(env, cell, &family_data, &family_length); + RETURN_IF_ERROR(GetFamily(env, cell, &family_data, &family_length)); if (CompareStrings(family, family_data, family_length) != 0) { *is_null = true; return Status::OK(); @@ -659,13 +687,13 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family, // Check qualifier. If it doesn't match, we have a NULL value. void* qualifier_data; int qualifier_length; - GetQualifier(env, cell, &qualifier_data, &qualifier_length); + RETURN_IF_ERROR(GetQualifier(env, cell, &qualifier_data, &qualifier_length)); if (CompareStrings(qualifier, qualifier_data, qualifier_length) != 0) { *is_null = true; return Status::OK(); } } - GetValue(env, cell, data, length); + RETURN_IF_ERROR(GetValue(env, cell, data, length)); *is_null = false; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hbase-table-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-scanner.h b/be/src/exec/hbase-table-scanner.h index 994a036..52e5da7 100644 --- a/be/src/exec/hbase-table-scanner.h +++ b/be/src/exec/hbase-table-scanner.h @@ -271,18 +271,20 @@ class HBaseTableScanner { Status InitScanRange(JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes); /// Copies the row key of cell into value_pool_ and returns it via *data and *length. - inline void GetRowKey(JNIEnv* env, jobject cell, void** data, int* length); + /// Returns error status if memory limit is exceeded. + inline Status GetRowKey(JNIEnv* env, jobject cell, void** data, int* length); /// Copies the column family of cell into value_pool_ and returns it - /// via *data and *length. - inline void GetFamily(JNIEnv* env, jobject cell, void** data, int* length); + /// via *data and *length. Returns error status if memory limit is exceeded. + inline Status GetFamily(JNIEnv* env, jobject cell, void** data, int* length); /// Copies the column qualifier of cell into value_pool_ and returns it - /// via *data and *length. - inline void GetQualifier(JNIEnv* env, jobject cell, void** data, int* length); + /// via *data and *length. Returns error status if memory limit is exceeded. + inline Status GetQualifier(JNIEnv* env, jobject cell, void** data, int* length); /// Copies the value of cell into value_pool_ and returns it via *data and *length. - inline void GetValue(JNIEnv* env, jobject cell, void** data, int* length); + /// Returns error status if memory limit is exceeded. + inline Status GetValue(JNIEnv* env, jobject cell, void** data, int* length); /// Returns the current value of cells_[cell_index_] in *data and *length /// if its family/qualifier match the given family/qualifier. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc index a84fe7b..f3ebb97 100644 --- a/be/src/exec/hdfs-avro-scanner-ir.cc +++ b/be/src/exec/hdfs-avro-scanner-ir.cc @@ -19,15 +19,18 @@ #include "runtime/string-value.inline.h" using namespace impala; +using namespace strings; // Functions in this file are cross-compiled to IR with clang. int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data, - Tuple* tuple, TupleRow* tuple_row) { + Tuple* tuple, TupleRow* tuple_row) { int num_to_commit = 0; for (int i = 0; i < max_tuples; ++i) { InitTuple(template_tuple_, tuple); - MaterializeTuple(*avro_header_->schema.get(), pool, data, tuple); + if (UNLIKELY(!MaterializeTuple(*avro_header_->schema.get(), pool, data, tuple))) { + return 0; + } tuple_row->SetTuple(scan_node_->tuple_idx(), tuple); if (EvalConjuncts(tuple_row)) { ++num_to_commit; @@ -50,7 +53,7 @@ bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data) { } void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { + void* slot, MemPool* pool) { if (write_slot) { DCHECK_EQ(type, TYPE_BOOLEAN); *reinterpret_cast<bool*>(slot) = *reinterpret_cast<bool*>(*data); @@ -59,7 +62,7 @@ void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool w } void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { + void* slot, MemPool* pool) { int32_t val = ReadWriteUtil::ReadZInt(data); if (write_slot) { if (type == TYPE_INT) { @@ -77,7 +80,7 @@ void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool wri } void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { + void* slot, MemPool* pool) { int64_t val = ReadWriteUtil::ReadZLong(data); if (write_slot) { if (type == TYPE_BIGINT) { @@ -93,7 +96,7 @@ void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool wri } void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { + void* slot, MemPool* pool) { if (write_slot) { float val = *reinterpret_cast<float*>(*data); if (type == TYPE_FLOAT) { @@ -108,7 +111,7 @@ void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool wri } void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { + void* slot, MemPool* pool) { if (write_slot) { DCHECK_EQ(type, TYPE_DOUBLE); *reinterpret_cast<double*>(slot) = *reinterpret_cast<double*>(*data); @@ -117,7 +120,7 @@ void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool wr } void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { + bool write_slot, void* slot, MemPool* pool) { int64_t len = ReadWriteUtil::ReadZLong(data); if (write_slot) { DCHECK(type == TYPE_VARCHAR); @@ -130,8 +133,8 @@ void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** *data += len; } -void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { +bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data, + bool write_slot, void* slot, MemPool* pool) { int64_t len = ReadWriteUtil::ReadZLong(data); if (write_slot) { DCHECK(type == TYPE_CHAR); @@ -139,7 +142,13 @@ void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da int str_len = std::min(static_cast<int>(len), max_len); if (ctype.IsVarLenStringType()) { StringValue* sv = reinterpret_cast<StringValue*>(slot); - sv->ptr = reinterpret_cast<char*>(pool->Allocate(max_len)); + sv->ptr = reinterpret_cast<char*>(pool->TryAllocate(max_len)); + if (UNLIKELY(sv->ptr == NULL)) { + string details = Substitute("HdfsAvroScanner::ReadAvroChar() failed to allocate" + "$0 bytes for char slot.", max_len); + parse_status_ = pool->mem_tracker()->MemLimitExceeded(state_, details, max_len); + return false; + } sv->len = max_len; memcpy(sv->ptr, *data, str_len); StringValue::PadWithSpaces(sv->ptr, max_len, str_len); @@ -149,10 +158,11 @@ void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da } } *data += len; + return true; } void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { + bool write_slot, void* slot, MemPool* pool) { int64_t len = ReadWriteUtil::ReadZLong(data); if (write_slot) { DCHECK(type == TYPE_STRING); @@ -164,7 +174,7 @@ void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data, } void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { + bool write_slot, void* slot, MemPool* pool) { int64_t len = ReadWriteUtil::ReadZLong(data); if (write_slot) { // Decimals are encoded as big-endian integers. Copy the decimal into the most http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index ddcec6b..060ec05 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -47,6 +47,9 @@ const string HdfsAvroScanner::AVRO_NULL_CODEC("null"); const string HdfsAvroScanner::AVRO_SNAPPY_CODEC("snappy"); const string HdfsAvroScanner::AVRO_DEFLATE_CODEC("deflate"); +const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate " + "$1 bytes for $2."; + #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state) @@ -496,6 +499,7 @@ Status HdfsAvroScanner::ProcessRange() { num_to_commit = DecodeAvroData(max_tuples, pool, &data, tuple, tuple_row); } } + RETURN_IF_ERROR(parse_status_); RETURN_IF_ERROR(CommitRows(num_to_commit)); num_records -= max_tuples; COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples); @@ -512,7 +516,7 @@ Status HdfsAvroScanner::ProcessRange() { return Status::OK(); } -void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, +bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool, uint8_t** data, Tuple* tuple) { DCHECK_EQ(record_schema.schema->type, AVRO_RECORD); BOOST_FOREACH(const AvroSchemaElement& element, record_schema.children) { @@ -555,7 +559,11 @@ void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) { ReadAvroVarchar(slot_type, slot_desc->type().len, data, write_slot, slot, pool); } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) { - ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot, slot, pool); + if (UNLIKELY(!ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot, + slot, pool))) { + DCHECK(!parse_status_.ok()); + return false; + } } else { ReadAvroString(slot_type, data, write_slot, slot, pool); } @@ -576,53 +584,86 @@ void HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, DCHECK(false) << "Unsupported SchemaElement: " << type; } } + return true; } // This function produces a codegen'd function equivalent to MaterializeTuple() but // optimized for the table schema. Via helper functions CodegenReadRecord() and // CodegenReadScalar(), it eliminates the conditionals necessary when interpreting the // type of each element in the schema, instead generating code to handle each element in -// the schema. Example output: +// the schema. Example output with tpch.region: // -// define void @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this, +// define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this, +// %"struct.impala::AvroSchemaElement"* %record_schema, // %"class.impala::MemPool"* %pool, i8** %data, %"class.impala::Tuple"* %tuple) { // entry: -// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32 }* +// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32, +// %"struct.impala::StringValue", %"struct.impala::StringValue" }* // %is_not_null = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh( // %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data) // br i1 %is_not_null, label %read_field, label %null_field // // read_field: ; preds = %entry -// %slot = getelementptr inbounds { i8, i32 }* %tuple_ptr, i32 0, i32 1 +// %slot = getelementptr inbounds { i8, i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 1 // %opaque_slot = bitcast i32* %slot to i8* // call void -// @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE( -// %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, +// @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE( +// %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i1 true, // i8* %opaque_slot, %"class.impala::MemPool"* %pool) -// br label %endif +// br label %end_field // // null_field: ; preds = %entry -// call void @SetNull({ i8, i32 }* %tuple_ptr) -// br label %endif +// call void @SetNull({ i8, i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr) +// br label %end_field +// +// end_field: ; preds = %read_field, %null_field +// %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh( +// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data) +// br i1 %is_not_null4, label %read_field1, label %null_field3 // -// endif: ; preds = %null_field, %read_field -// %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh( +// read_field1: ; preds = %end_field +// %slot5 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2 +// %opaque_slot6 = bitcast %"struct.impala::StringValue"* %slot5 to i8* +// call void +// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE( +// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true, +// i8* %opaque_slot6, %"class.impala::MemPool"* %pool) +// br label %end_field2 +// +// null_field3: ; preds = %end_field +// call void @SetNull1({ i8, i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr) +// br label %end_field2 +// +// end_field2: ; preds = %read_field1, %null_field3 +// %is_not_null10 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh( // %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data) -// br i1 %is_not_null4, label %read_field1, label %null_field2 +// br i1 %is_not_null10, label %read_field7, label %null_field9 // -// read_field1: ; preds = %endif +// read_field7: ; preds = %end_field2 +// %slot11 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3 +// %opaque_slot12 = bitcast %"struct.impala::StringValue"* %slot11 to i8* // call void -// @_ZN6impala15HdfsAvroScanner15ReadAvroBooleanENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE( -// %"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data, -// i8* null, %"class.impala::MemPool"* %pool) -// br label %endif3 +// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE( +// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true, +// i8* %opaque_slot12, %"class.impala::MemPool"* %pool) +// br label %end_field8 +// +// null_field9: ; preds = %end_field2 +// call void @SetNull2({ i8, i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr) +// br label %end_field8 // -// null_field2: ; preds = %endif -// br label %endif3 +// end_field8: ; preds = %read_field7, %null_field9 +// ret i1 true // -// endif3: ; preds = %null_field2, %read_field1 -// ret void -// } +// bail_out: ; No predecessors! +// ret i1 false // used only if there is CHAR. +//} Function* HdfsAvroScanner::CodegenMaterializeTuple( HdfsScanNode* node, LlvmCodeGen* codegen) { LLVMContext& context = codegen->context(); @@ -644,7 +685,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple( Type* mempool_type = PointerType::get(codegen->GetType(MemPool::LLVM_CLASS_NAME), 0); Type* schema_element_type = codegen->GetPtrType(AvroSchemaElement::LLVM_CLASS_NAME); - LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->void_type()); + LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->boolean_type()); prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type)); @@ -661,24 +702,33 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple( Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr"); + // Create a bail out block to handle decoding failures. + BasicBlock* bail_out_block = BasicBlock::Create(context, "bail_out", fn, NULL); + Status status = CodegenReadRecord( - SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, NULL, this_val, - pool_val, tuple_val, data_val); + SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, bail_out_block, + bail_out_block, this_val, pool_val, tuple_val, data_val); if (!status.ok()) { VLOG_QUERY << status.GetDetail(); fn->eraseFromParent(); return NULL; } - builder.SetInsertPoint(&fn->back()); - builder.CreateRetVoid(); + // Returns true on successful decoding. + builder.CreateRet(codegen->true_value()); + + // Returns false on decoding errors. + builder.SetInsertPoint(bail_out_block); + builder.CreateRet(codegen->false_value()); + return codegen->FinalizeFunction(fn); } Status HdfsAvroScanner::CodegenReadRecord( const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node, LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before, - Value* this_val, Value* pool_val, Value* tuple_val, Value* data_val) { + BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val, + Value* data_val) { DCHECK_EQ(record.schema->type, AVRO_RECORD); LLVMContext& context = codegen->context(); LlvmCodeGen::LlvmBuilder* builder = @@ -742,21 +792,22 @@ Status HdfsAvroScanner::CodegenReadRecord( BasicBlock* insert_before_block = (null_block != NULL) ? null_block : end_field_block; RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, node, codegen, builder, fn, - insert_before_block, this_val, pool_val, tuple_val, data_val)); + insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val)); } else { - RETURN_IF_ERROR(CodegenReadScalar( - *field, slot_desc, codegen, builder, this_val, pool_val, tuple_val, data_val)); + RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder, + end_field_block, bail_out, this_val, pool_val, tuple_val, data_val)); } builder->CreateBr(end_field_block); - // Set insertion point for next field + // Set insertion point for next field. builder->SetInsertPoint(end_field_block); } return Status::OK(); } Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, - SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, Value* this_val, + SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, + BasicBlock* end_field_block, BasicBlock* bail_out_block, Value* this_val, Value* pool_val, Value* tuple_val, Value* data_val) { LlvmCodeGen::LlvmBuilder* builder = reinterpret_cast<LlvmCodeGen::LlvmBuilder*>(void_builder); @@ -781,10 +832,13 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, case AVRO_BYTES: if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) { read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR, false); + } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) { + read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_CHAR, false); } else { read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING, false); } break; + // TODO: Add AVRO_DECIMAL here. default: return Status(Substitute( "Failed to codegen MaterializeTuple() due to unsupported type: $0", @@ -811,13 +865,19 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, } // NOTE: ReadAvroVarchar/Char has different signature than rest of read functions - if ((slot_desc != NULL) && + if (slot_desc != NULL && (slot_desc->type().type == TYPE_VARCHAR || slot_desc->type().type == TYPE_CHAR)) { - // Need to pass an extra argument (the length) to the codegen function + // Need to pass an extra argument (the length) to the codegen function. Value* fixed_len = builder->getInt32(slot_desc->type().len); Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val, write_slot_val, opaque_slot_val, pool_val}; - builder->CreateCall(read_field_fn, read_field_args); + if (slot_desc->type().type == TYPE_VARCHAR) { + builder->CreateCall(read_field_fn, read_field_args); + } else { + // ReadAvroChar() returns false if allocation from MemPool fails. + Value* ret_val = builder->CreateCall(read_field_fn, read_field_args); + builder->CreateCondBr(ret_val, end_field_block, bail_out_block); + } } else { Value* read_field_args[] = {this_val, slot_type_val, data_val, write_slot_val, opaque_slot_val, pool_val}; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-avro-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h index 15e046f..682ba04 100644 --- a/be/src/exec/hdfs-avro-scanner.h +++ b/be/src/exec/hdfs-avro-scanner.h @@ -176,8 +176,10 @@ class HdfsAvroScanner : public BaseSequenceScanner { int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data, Tuple* tuple, TupleRow* tuple_row); - /// Materializes a single tuple from serialized record data. - void MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool, + /// Materializes a single tuple from serialized record data. Will return false and set + /// error in parse_status_ if memory limit is exceeded when allocating new char buffer. + /// See comments below for ReadAvroChar(). + bool MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool, uint8_t** data, Tuple* tuple); /// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted @@ -190,7 +192,7 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// schema. /// TODO: Codegen a function for each unique file schema. static llvm::Function* CodegenMaterializeTuple(HdfsScanNode* node, - LlvmCodeGen* codegen); + LlvmCodeGen* codegen); /// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro /// record. @@ -200,21 +202,23 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// - builder: used to insert the IR, starting at the current insert point. The insert /// point will be left at the end of the record but before the 'insert_before' /// block. - /// - insert_before: the block to insert any new blocks directly before. NULL if blocks - /// should be inserted at the end of fn. (This could theoretically be inferred from - /// builder's insert point, but I can't figure out how to get the successor to a - /// basic block.) + /// - insert_before: the block to insert any new blocks directly before. This is either + /// the bail_out block or some basic blocks before that. + /// - bail_out: the block to jump to if anything fails. This is used in particular by + /// ReadAvroChar() which can exceed memory limit during allocation from MemPool. /// - this_val, pool_val, tuple_val, data_val: arguments to MaterializeTuple() static Status CodegenReadRecord( const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node, LlvmCodeGen* codegen, void* builder, llvm::Function* fn, - llvm::BasicBlock* insert_before, llvm::Value* this_val, llvm::Value* pool_val, - llvm::Value* tuple_val, llvm::Value* data_val); + llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, llvm::Value* this_val, + llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val); /// Creates the IR for reading an Avro scalar at builder's current insert point. static Status CodegenReadScalar(const AvroSchemaElement& element, - SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* builder, llvm::Value* this_val, - llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val); + SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, + llvm::BasicBlock* end_field_block, llvm::BasicBlock* bail_out_block, + llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val, + llvm::Value* data_val); /// The following are cross-compiled functions for parsing a serialized Avro primitive /// type and writing it to a slot. They can also be used for skipping a field without @@ -225,6 +229,10 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// - type: The type of the slot. (This is necessary because there is not a 1:1 mapping /// between Avro types and Impala's primitive types.) /// - pool: MemPool for string data. + /// + /// ReadAvroChar() will return false and set error in parse_status_ if memory limit + /// is exceeded when allocating the new char buffer. It returns true otherwise. + /// void ReadAvroBoolean( PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); void ReadAvroInt32( @@ -238,11 +246,11 @@ class HdfsAvroScanner : public BaseSequenceScanner { void ReadAvroVarchar( PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot, MemPool* pool); - void ReadAvroChar( + bool ReadAvroChar( PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot, MemPool* pool); - void ReadAvroString( PrimitiveType type, uint8_t** data, bool write_slot, void* slot, - MemPool* pool); + void ReadAvroString( + PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); /// Same as the above functions, except takes the size of the decimal slot (i.e. 4, 8, or /// 16) instead of the type (which should be TYPE_DECIMAL). The slot size is passed http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index ad8e360..a910243 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -176,7 +176,8 @@ DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) { namespace impala { -const string PARQUET_MEM_LIMIT_EXCEEDED = "$0 failed to allocate $1 bytes for $2."; +const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate " + "$1 bytes for $2."; HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state) : HdfsScanner(scan_node, state), @@ -750,7 +751,7 @@ bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::ConvertSlot( if (slot_desc()->type().IsVarLenStringType()) { sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len)); if (UNLIKELY(sv.ptr == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot()", + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot", len, "StringValue"); parent_->parse_status_ = pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len); @@ -1132,7 +1133,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() { if (decompressor_.get() != NULL) { dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size); if (UNLIKELY(dict_values == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()", + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", uncompressed_size, "dictionary"); return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded( parent_->state_, details, uncompressed_size); @@ -1147,7 +1148,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() { // more data) to a new buffer dict_values = parent_->dictionary_pool_->TryAllocate(data_size); if (UNLIKELY(dict_values == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()", + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", data_size, "dictionary"); return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded( parent_->state_, details, data_size); @@ -1184,7 +1185,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() { uint8_t* decompressed_buffer = decompressed_data_pool_->TryAllocate(uncompressed_size); if (UNLIKELY(decompressed_buffer == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage()", + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", uncompressed_size, "decompressed data"); return decompressed_data_pool_->mem_tracker()->MemLimitExceeded( parent_->state_, details, uncompressed_size); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 7c0fd9a..a26a575 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -115,8 +115,8 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, THdfsFileFormat::type type, const string& scanner_name) { if (!scan_node_->tuple_desc()->string_slots().empty() && partition->escape_char() != '\0') { - // Cannot use codegen if there are strings slots and we need to - // compact (i.e. copy) the data. + // Codegen currently doesn't emit call to MemPool::TryAllocate() so skip codegen if + // there are strings slots and we need to compact (i.e. copy) the data. scan_node_->IncNumScannersCodegenDisabled(); return Status::OK(); } @@ -181,8 +181,8 @@ Status HdfsScanner::CommitRows(int num_rows) { RETURN_IF_ERROR(StartNewRowBatch()); } if (context_->cancelled()) return Status::CANCELLED; - // TODO: Replace with GetQueryStatus(). - RETURN_IF_ERROR(state_->CheckQueryState()); + // Check for UDF errors. + RETURN_IF_ERROR(state_->GetQueryStatus()); // Free local expr allocations for this thread HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin(); for (; iter != scanner_conjuncts_map_.end(); ++iter) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 3bed4e3..168cdd1 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -252,6 +252,10 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() { // Call jitted function if possible int tuples_returned; if (write_tuples_fn_ != NULL) { + // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string slots + // and escape characters. TextConverter::WriteSlot() will be used instead. + DCHECK(scan_node_->tuple_desc()->string_slots().empty() || + delimited_text_parser_->escape_char() == '\0'); // last argument: seq always starts at record_location[0] tuples_returned = write_tuples_fn_(this, pool, tuple_row, batch_->row_byte_size(), &field_locations_[0], num_to_process, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 69abebc..9572260 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -35,6 +35,7 @@ using boost::algorithm::ends_with; using boost::algorithm::to_lower; using namespace impala; using namespace llvm; +using namespace strings; const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner"; @@ -359,7 +360,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { // There can be one partial tuple which returned no more fields from this buffer. DCHECK_LE(*num_tuples, num_fields + 1); if (!boundary_column_.Empty()) { - CopyBoundaryField(&field_locations_[0], pool); + RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool)); boundary_column_.Clear(); } num_tuples_materialized = WriteFields(pool, tuple_row_mem, num_fields, *num_tuples); @@ -379,14 +380,14 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { // Save contents that are split across buffers if we are going to return this column if (col_start != byte_buffer_ptr_ && delimited_text_parser_->ReturnCurrentColumn()) { DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_); - boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start); + RETURN_IF_ERROR(boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start)); char* last_row = NULL; if (*num_tuples == 0) { last_row = batch_start_ptr_; } else { last_row = row_end_locations_[*num_tuples - 1] + 1; } - boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row); + RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row)); } COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples); @@ -718,6 +719,10 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, int tuples_returned = 0; // Call jitted function if possible if (write_tuples_fn_ != NULL) { + // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string + // slots and escape characters. TextConverter::WriteSlot() will be used instead. + DCHECK(scan_node_->tuple_desc()->string_slots().empty() || + delimited_text_parser_->escape_char() == '\0'); tuples_returned = write_tuples_fn_(this, pool, tuple_row, batch_->row_byte_size(), fields, num_tuples, max_added_tuples, scan_node_->materialized_slots().size(), num_tuples_processed); @@ -751,15 +756,21 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, return num_tuples_materialized; } -void HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) { +Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) { bool needs_escape = data->len < 0; int copy_len = needs_escape ? -data->len : data->len; int total_len = copy_len + boundary_column_.Size(); - char* str_data = reinterpret_cast<char*>(pool->Allocate(total_len)); + char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len)); + if (UNLIKELY(str_data == NULL)) { + string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate " + "$0 bytes.", total_len); + return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len); + } memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size()); memcpy(str_data + boundary_column_.Size(), data->start, copy_len); data->start = str_data; data->len = needs_escape ? -total_len : total_len; + return Status::OK(); } int HdfsTextScanner::WritePartialTuple(FieldLocation* fields, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index 50758ae..804c9c3 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -116,11 +116,12 @@ class HdfsTextScanner : public HdfsScanner { int64_t* decompressed_len, bool *eosr); /// Prepends field data that was from the previous file buffer (This field straddled two - /// file buffers). 'data' already contains the pointer/len from the current file buffer, - /// boundary_column_ contains the beginning of the data from the previous file - /// buffer. This function will allocate a new string from the tuple pool, concatenate the - /// two pieces and update 'data' to contain the new pointer/len. - void CopyBoundaryField(FieldLocation* data, MemPool* pool); + /// file buffers). 'data' already contains the pointer/len from the current file buffer, + /// boundary_column_ contains the beginning of the data from the previous file buffer. + /// This function will allocate a new string from the tuple pool, concatenate the + /// two pieces and update 'data' to contain the new pointer/len. Return error status if + /// memory limit is exceeded when allocating a new string. + Status CopyBoundaryField(FieldLocation* data, MemPool* pool); /// Writes the intermediate parsed data into slots, outputting /// tuples to row_batch as they complete. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 040362b..897f8c5 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -118,7 +118,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don } Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { - if (parent_->cancelled()) return Status::CANCELLED; + if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED; // io_buffer_ should only be null the first time this is called DCHECK(io_buffer_ != NULL || @@ -178,7 +178,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_ *len = 0; if (eosr()) return Status::OK(); - if (parent_->cancelled()) { + if (UNLIKELY(parent_->cancelled())) { DCHECK(*out_buffer == NULL); return Status::CANCELLED; } @@ -243,11 +243,11 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len, while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) { // We need to fetch more bytes. Copy the end of the current buffer and fetch the next // one. - boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_); + RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_)); boundary_buffer_bytes_left_ += io_buffer_bytes_left_; RETURN_IF_ERROR(GetNextBuffer()); - if (parent_->cancelled()) return Status::CANCELLED; + if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED; if (io_buffer_bytes_left_ == 0) { // No more bytes (i.e. EOF) @@ -267,7 +267,7 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len, output_buffer_pos_ = &io_buffer_pos_; output_buffer_bytes_left_ = &io_buffer_bytes_left_; } else { - boundary_buffer_->Append(io_buffer_pos_, num_bytes); + RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes)); boundary_buffer_bytes_left_ += num_bytes; boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) + boundary_buffer_->Size() - boundary_buffer_bytes_left_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/text-converter.h ---------------------------------------------------------------------- diff --git a/be/src/exec/text-converter.h b/be/src/exec/text-converter.h index 266e74d..3651146 100644 --- a/be/src/exec/text-converter.h +++ b/be/src/exec/text-converter.h @@ -49,7 +49,7 @@ class TextConverter { /// and writes the result into the tuples's slot. /// copy_string indicates whether we need to make a separate copy of the string data: /// For regular unescaped strings, we point to the original data in the file_buf_. - /// For regular escaped strings, we copy an its unescaped string into a separate buffer + /// For regular escaped strings, we copy its unescaped string into a separate buffer /// and point to it. /// If the string needs to be copied, the memory is allocated from 'pool', otherwise /// 'pool' is unused. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/exec/text-converter.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/text-converter.inline.h b/be/src/exec/text-converter.inline.h index 7b429fa..d50b66f 100644 --- a/be/src/exec/text-converter.inline.h +++ b/be/src/exec/text-converter.inline.h @@ -33,7 +33,7 @@ namespace impala { /// Note: this function has a codegen'd version. Changing this function requires -/// corresponding changes to CodegenWriteSlot. +/// corresponding changes to CodegenWriteSlot(). inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tuple, const char* data, int len, bool copy_string, bool need_escape, MemPool* pool) { if ((len == 0 && !slot_desc->type().IsStringType()) || data == NULL) { @@ -59,7 +59,7 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR) buffer_len = type.len; bool reuse_data = type.IsVarLenStringType() && - !(len != 0 && (copy_string || need_escape)); + !(len != 0 && (copy_string || need_escape)); if (type.type == TYPE_CHAR) reuse_data &= (buffer_len <= len); StringValue str; @@ -67,9 +67,18 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup if (reuse_data) { str.ptr = const_cast<char*>(data); } else { + // The codegen version of this code (generated by CodegenWriteSlot()) doesn't + // include this path. In other words, 'reuse_data' will always be true in the + // codegen version: + // 1. CodegenWriteSlot() doesn't yet support slot of TYPE_CHAR + // 2. HdfsScanner::InitializeWriteTuplesFn() will not codegen if there is + // any escape character. + // 3. HdfsScanner::WriteCompleteTuple() always calls this function with + // 'copy_string' == false. str.ptr = type.IsVarLenStringType() ? - reinterpret_cast<char*>(pool->Allocate(buffer_len)) : + reinterpret_cast<char*>(pool->TryAllocate(buffer_len)) : reinterpret_cast<char*>(slot); + if (UNLIKELY(str.ptr == NULL)) return false; if (need_escape) { UnescapeString(data, str.ptr, &str.len, buffer_len); } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/70502942/be/src/runtime/string-buffer.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h index 1787b12..c8f79df 100644 --- a/be/src/runtime/string-buffer.h +++ b/be/src/runtime/string-buffer.h @@ -16,9 +16,12 @@ #ifndef IMPALA_RUNTIME_STRING_BUFFER_H #define IMPALA_RUNTIME_STRING_BUFFER_H +#include "common/status.h" #include "runtime/mem-pool.h" #include "runtime/string-value.h" +using namespace strings; + namespace impala { /// Dynamic-sizable string (similar to std::string) but without as many @@ -30,7 +33,7 @@ namespace impala { class StringBuffer { public: /// C'tor for StringBuffer. Memory backing the string will be allocated from - /// the pool as necessary. Can optionally be initialized from a StringValue. + /// the pool as necessary. Can optionally be initialized from a StringValue. StringBuffer(MemPool* pool, StringValue* str = NULL) : pool_(pool), buffer_size_(0) { DCHECK(pool_ != NULL); @@ -41,24 +44,24 @@ class StringBuffer { } /// Append 'str' to the current string, allocating a new buffer as necessary. - void Append(const char* str, int len) { + /// Return error status if memory limit is exceeded. + Status Append(const char* str, int len) { int new_len = len + string_value_.len; - if (new_len > buffer_size_) { - GrowBuffer(new_len); - } + if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len)); memcpy(string_value_.ptr + string_value_.len, str, len); string_value_.len = new_len; + return Status::OK(); } /// TODO: switch everything to uint8_t? - void Append(const uint8_t* str, int len) { - Append(reinterpret_cast<const char*>(str), len); + Status Append(const uint8_t* str, int len) { + return Append(reinterpret_cast<const char*>(str), len); } - /// Assigns contents to StringBuffer - void Assign(const char* str, int len) { + /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded. + Status Assign(const char* str, int len) { Clear(); - Append(str, len); + return Append(str, len); } /// Clear the underlying StringValue. The allocated buffer can be reused. @@ -94,16 +97,23 @@ class StringBuffer { private: /// Grows the buffer backing the string to be at least new_size, copying over the - /// previous string data into the new buffer. - void GrowBuffer(int new_len) { + /// previous string data into the new buffer. Return error status if memory limit + /// is exceeded. + Status GrowBuffer(int new_len) { // TODO: Release/reuse old buffers somehow buffer_size_ = std::max(buffer_size_ * 2, new_len); DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH); - char* new_buffer = reinterpret_cast<char*>(pool_->Allocate(buffer_size_)); - if (string_value_.len > 0) { + char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_)); + if (UNLIKELY(new_buffer == NULL)) { + string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.", + buffer_size_); + return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_); + } + if (LIKELY(string_value_.len > 0)) { memcpy(new_buffer, string_value_.ptr, string_value_.len); } string_value_.ptr = new_buffer; + return Status::OK(); } MemPool* pool_;
