IMPALA-3441, IMPALA-3659: check for malformed Avro data This patch adds error checking to the Avro scanner (both the codegen'd and interepted paths), including out-of-bounds checks and data validity checks.
I ran a local benchmark using the following queries: set num_scanner_threads=1; select count(i) from default.avro_bigints_big; # file contains only longs select max(l_orderkey) from biglineitem_avro; # file has tpch.lineitem schema Both benchmark queries see negligable or no performance impact. This patch adds a new Avro scanner unit test and an end-to-end test that queries several corrupted files, as well as updates the zig-zag varlen int unit test. Change-Id: I801a11c496a128e02c564c2a9c44baa5a97be132 Reviewed-on: http://gerrit.cloudera.org:8080/3072 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/01287a3b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/01287a3b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/01287a3b Branch: refs/heads/master Commit: 01287a3ba909f93a5bbe72081d5b9ec67db70257 Parents: c076f09 Author: Skye Wanderman-Milne <[email protected]> Authored: Thu Jun 9 15:09:43 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Mon Jun 13 18:32:32 2016 -0700 ---------------------------------------------------------------------- be/src/exec/CMakeLists.txt | 1 + be/src/exec/base-sequence-scanner.cc | 10 + be/src/exec/base-sequence-scanner.h | 3 + be/src/exec/hdfs-avro-scanner-ir.cc | 177 +++++--- be/src/exec/hdfs-avro-scanner-test.cc | 401 +++++++++++++++++++ be/src/exec/hdfs-avro-scanner.cc | 307 +++++++++----- be/src/exec/hdfs-avro-scanner.h | 86 ++-- be/src/exec/hdfs-avro-table-writer.cc | 3 +- be/src/exec/hdfs-scanner.cc | 21 + be/src/exec/hdfs-scanner.h | 3 + be/src/exec/read-write-util.cc | 67 +++- be/src/exec/read-write-util.h | 49 ++- be/src/exec/scanner-context.cc | 15 +- be/src/exec/scanner-context.h | 1 + be/src/exec/scanner-context.inline.h | 24 +- be/src/exec/zigzag-test.cc | 107 ++++- common/thrift/generate_error_codes.py | 20 + testdata/bad_avro_snap/README | 19 + testdata/bad_avro_snap/invalid_union.avro | Bin 0 -> 191 bytes testdata/bad_avro_snap/negative_string_len.avro | Bin 0 -> 180 bytes testdata/bad_avro_snap/truncated_float.avro | Bin 0 -> 175 bytes testdata/bad_avro_snap/truncated_string.avro | Bin 0 -> 171 bytes .../functional/functional_schema_template.sql | 20 + .../datasets/functional/schema_constraints.csv | 2 + .../queries/DataErrorsTest/avro-errors.test | 24 ++ tests/common/test_result_verifier.py | 13 +- tests/data_errors/test_data_errors.py | 12 + 27 files changed, 1143 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index c3208fc..876fc7e 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -98,3 +98,4 @@ ADD_BE_TEST(row-batch-list-test) ADD_BE_TEST(incr-stats-util-test) ADD_BE_TEST(kudu-scan-node-test) ADD_BE_TEST(kudu-table-sink-test) +ADD_BE_TEST(hdfs-avro-scanner-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 4e0ac59..d235b77 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -22,6 +22,7 @@ #include "runtime/string-search.h" #include "util/codec.h" #include "util/runtime-profile-counters.h" +#include "util/test-info.h" #include "common/names.h" @@ -71,6 +72,15 @@ BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* state num_syncs_(0) { } +BaseSequenceScanner::BaseSequenceScanner() + : HdfsScanner(), + header_(NULL), + block_start_(0), + total_block_size_(0), + num_syncs_(0) { + DCHECK(TestInfo::is_test()); +} + BaseSequenceScanner::~BaseSequenceScanner() { } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/base-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h index 555444a..ea7ad78 100644 --- a/be/src/exec/base-sequence-scanner.h +++ b/be/src/exec/base-sequence-scanner.h @@ -129,6 +129,9 @@ class BaseSequenceScanner : public HdfsScanner { /// If true, this scanner object is only for processing the header. bool only_parsing_header_; + /// Unit test constructor + BaseSequenceScanner(); + private: /// Set to true when this scanner has processed all the bytes it is responsible /// for, i.e., when it reads a sync occurring completely in the next scan http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc index 84ca502..6839fe1 100644 --- a/be/src/exec/hdfs-avro-scanner-ir.cc +++ b/be/src/exec/hdfs-avro-scanner-ir.cc @@ -24,12 +24,16 @@ using namespace strings; // Functions in this file are cross-compiled to IR with clang. +const int AVRO_FLOAT_SIZE = 4; +const int AVRO_DOUBLE_SIZE = 8; + int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data, - Tuple* tuple, TupleRow* tuple_row) { + uint8_t* data_end, Tuple* tuple, TupleRow* tuple_row) { int num_to_commit = 0; for (int i = 0; i < max_tuples; ++i) { InitTuple(template_tuple_, tuple); - if (UNLIKELY(!MaterializeTuple(*avro_header_->schema.get(), pool, data, tuple))) { + if (UNLIKELY(!MaterializeTuple(*avro_header_->schema.get(), pool, data, data_end, + tuple))) { return 0; } tuple_row->SetTuple(scan_node_->tuple_idx(), tuple); @@ -42,105 +46,160 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** dat return num_to_commit; } -bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data) { +bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data, + uint8_t* data_end, bool* is_null) { DCHECK(null_union_position == 0 || null_union_position == 1); + if (UNLIKELY(*data == data_end)) { + SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK); + return false; + } int8_t union_position = **data; // Union position is varlen zig-zag encoded - DCHECK(union_position == 0 || union_position == 2); + if (UNLIKELY(union_position != 0 && union_position != 2)) { + SetStatusInvalidValue(TErrorCode::AVRO_INVALID_UNION, union_position); + return false; + } // "Decode" zig-zag encoding if (union_position == 2) union_position = 1; *data += 1; - return union_position != null_union_position; + *is_null = union_position == null_union_position; + return true; } -void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { +bool HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, + uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) { + if (UNLIKELY(*data == data_end)) { + SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK); + return false; + } if (write_slot) { DCHECK_EQ(type, TYPE_BOOLEAN); + if (UNLIKELY(**data != 0 && **data != 1)) { + SetStatusInvalidValue(TErrorCode::AVRO_INVALID_BOOLEAN, **data); + return false; + } *reinterpret_cast<bool*>(slot) = *reinterpret_cast<bool*>(*data); } *data += 1; + return true; } -void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { - int32_t val = ReadWriteUtil::ReadZInt(data); +bool HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool) { + ReadWriteUtil::ZIntResult r = ReadWriteUtil::ReadZInt(data, data_end); + if (UNLIKELY(!r.ok)) { + SetStatusCorruptData(TErrorCode::SCANNER_INVALID_INT); + return false; + } if (write_slot) { if (type == TYPE_INT) { - *reinterpret_cast<int32_t*>(slot) = val; + *reinterpret_cast<int32_t*>(slot) = r.val; } else if (type == TYPE_BIGINT) { - *reinterpret_cast<int64_t*>(slot) = val; + *reinterpret_cast<int64_t*>(slot) = r.val; } else if (type == TYPE_FLOAT) { - *reinterpret_cast<float*>(slot) = val; - } else if (type == TYPE_DOUBLE) { - *reinterpret_cast<double*>(slot) = val; + *reinterpret_cast<float*>(slot) = r.val; } else { - DCHECK(false); + DCHECK_EQ(type, TYPE_DOUBLE); + *reinterpret_cast<double*>(slot) = r.val; } } + return true; } -void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { - int64_t val = ReadWriteUtil::ReadZLong(data); +bool HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool) { + ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(data, data_end); + if (UNLIKELY(!r.ok)) { + SetStatusCorruptData(TErrorCode::SCANNER_INVALID_INT); + return false; + } if (write_slot) { if (type == TYPE_BIGINT) { - *reinterpret_cast<int64_t*>(slot) = val; + *reinterpret_cast<int64_t*>(slot) = r.val; } else if (type == TYPE_FLOAT) { - *reinterpret_cast<float*>(slot) = val; - } else if (type == TYPE_DOUBLE) { - *reinterpret_cast<double*>(slot) = val; + *reinterpret_cast<float*>(slot) = r.val; } else { - DCHECK(false); + DCHECK_EQ(type, TYPE_DOUBLE); + *reinterpret_cast<double*>(slot) = r.val; } } + return true; } -void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { +bool HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool) { + if (UNLIKELY(data_end - *data < AVRO_FLOAT_SIZE)) { + SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK); + return false; + } if (write_slot) { float val = *reinterpret_cast<float*>(*data); if (type == TYPE_FLOAT) { *reinterpret_cast<float*>(slot) = val; - } else if (type == TYPE_DOUBLE) { - *reinterpret_cast<double*>(slot) = val; } else { - DCHECK(false); + DCHECK_EQ(type, TYPE_DOUBLE); + *reinterpret_cast<double*>(slot) = val; } } - *data += 4; + *data += AVRO_FLOAT_SIZE; + return true; } -void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool write_slot, - void* slot, MemPool* pool) { +bool HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool) { + if (UNLIKELY(data_end - *data < AVRO_DOUBLE_SIZE)) { + SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK); + return false; + } if (write_slot) { DCHECK_EQ(type, TYPE_DOUBLE); *reinterpret_cast<double*>(slot) = *reinterpret_cast<double*>(*data); } - *data += 8; + *data += AVRO_DOUBLE_SIZE; + return true; } -void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { - int64_t len = ReadWriteUtil::ReadZLong(data); +ReadWriteUtil::ZLongResult HdfsAvroScanner::ReadFieldLen(uint8_t** data, uint8_t* data_end) { + ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(data, data_end); + if (UNLIKELY(!r.ok)) { + SetStatusCorruptData(TErrorCode::SCANNER_INVALID_INT); + return ReadWriteUtil::ZLongResult::error(); + } + if (UNLIKELY(r.val < 0)) { + SetStatusInvalidValue(TErrorCode::AVRO_INVALID_LENGTH, r.val); + return ReadWriteUtil::ZLongResult::error(); + } + if (UNLIKELY(data_end - *data < r.val)) { + SetStatusCorruptData(TErrorCode::AVRO_TRUNCATED_BLOCK); + return ReadWriteUtil::ZLongResult::error(); + } + return r; +} + +bool HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data, + uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) { + ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end); + if (UNLIKELY(!len.ok)) return false; if (write_slot) { DCHECK(type == TYPE_VARCHAR); StringValue* sv = reinterpret_cast<StringValue*>(slot); - int str_len = std::min(static_cast<int>(len), max_len); + int str_len = std::min(static_cast<int>(len.val), max_len); DCHECK(str_len >= 0); sv->len = str_len; sv->ptr = reinterpret_cast<char*>(*data); } - *data += len; + *data += len.val; + return true; } bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { - int64_t len = ReadWriteUtil::ReadZLong(data); + uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) { + ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end); + if (UNLIKELY(!len.ok)) return false; if (write_slot) { DCHECK(type == TYPE_CHAR); ColumnType ctype = ColumnType::CreateCharType(max_len); - int str_len = std::min(static_cast<int>(len), max_len); + int str_len = std::min(static_cast<int>(len.val), max_len); if (ctype.IsVarLenStringType()) { StringValue* sv = reinterpret_cast<StringValue*>(slot); sv->ptr = reinterpret_cast<char*>(pool->TryAllocate(max_len)); @@ -158,35 +217,42 @@ bool HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** da StringValue::PadWithSpaces(reinterpret_cast<char*>(slot), max_len, str_len); } } - *data += len; + *data += len.val; return true; } -void HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { - int64_t len = ReadWriteUtil::ReadZLong(data); +bool HdfsAvroScanner::ReadAvroString(PrimitiveType type, uint8_t** data, + uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) { + ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end); + if (UNLIKELY(!len.ok)) return false; if (write_slot) { DCHECK(type == TYPE_STRING); StringValue* sv = reinterpret_cast<StringValue*>(slot); - sv->len = len; + sv->len = len.val; sv->ptr = reinterpret_cast<char*>(*data); } - *data += len; + *data += len.val; + return true; } -void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data, - bool write_slot, void* slot, MemPool* pool) { - int64_t len = ReadWriteUtil::ReadZLong(data); +bool HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data, + uint8_t* data_end, bool write_slot, void* slot, MemPool* pool) { + ReadWriteUtil::ZLongResult len = ReadFieldLen(data, data_end); + if (UNLIKELY(!len.ok)) return false; if (write_slot) { + DCHECK_GE(len.val, 0); + if (UNLIKELY(len.val > slot_byte_size)) { + SetStatusInvalidValue(TErrorCode::AVRO_INVALID_LENGTH, len.val); + return false; + } // Decimals are encoded as big-endian integers. Copy the decimal into the most // significant bytes and then shift down to the correct position to sign-extend the // decimal. - DCHECK_LE(len, slot_byte_size); - int bytes_to_fill = slot_byte_size - len; + int bytes_to_fill = slot_byte_size - len.val; #if __BYTE_ORDER == __LITTLE_ENDIAN - BitUtil::ByteSwap(reinterpret_cast<uint8_t*>(slot) + bytes_to_fill, *data, len); + BitUtil::ByteSwap(reinterpret_cast<uint8_t*>(slot) + bytes_to_fill, *data, len.val); #else - memcpy(slot, *data, len); + memcpy(slot, *data, len.val); #endif switch (slot_byte_size) { case 4: { @@ -208,5 +274,6 @@ void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data, DCHECK(false) << "Decimal slots can't be this size: " << slot_byte_size; } } - *data += len; + *data += len.val; + return true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner-test.cc b/be/src/exec/hdfs-avro-scanner-test.cc new file mode 100644 index 0000000..e5f77f3 --- /dev/null +++ b/be/src/exec/hdfs-avro-scanner-test.cc @@ -0,0 +1,401 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/hdfs-avro-scanner.h" + +#include <gtest/gtest.h> +#include <limits.h> + +#include "common/init.h" +#include "exec/read-write-util.h" +#include "runtime/decimal-value.inline.h" +#include "runtime/runtime-state.h" +#include "runtime/string-value.inline.h" + +// TODO: CHAR, VARCHAR (IMPALA-3658) + +namespace impala { + +class HdfsAvroScannerTest : public testing::Test { + public: + void TestReadUnionType(int null_union_position, uint8_t* data, int64_t data_len, + bool expected_is_null, TErrorCode::type expected_error = TErrorCode::OK) { + // Reset parse_status_ + scanner_.parse_status_ = Status::OK(); + + uint8_t* new_data = data; + bool is_null = -1; + bool expect_success = expected_error == TErrorCode::OK; + + bool success = scanner_.ReadUnionType(null_union_position, &new_data, data + data_len, + &is_null); + EXPECT_EQ(success, expect_success); + + if (success) { + EXPECT_TRUE(scanner_.parse_status_.ok()); + EXPECT_EQ(is_null, expected_is_null); + EXPECT_EQ(new_data, data + 1); + } else { + EXPECT_EQ(scanner_.parse_status_.code(), expected_error); + } + } + + // Templated function for calling different ReadAvro* functions. + // + // PrimitiveType is a template parameter so we can pass in int 'slot_byte_size' to + // ReadAvroDecimal, but otherwise this argument is always the PrimitiveType 'type' + // argument. + template<typename T, typename ReadAvroTypeFn, typename PrimitiveType> + void TestReadAvroType(ReadAvroTypeFn read_fn, PrimitiveType type, uint8_t* data, + int64_t data_len, T expected_val, int expected_encoded_len, + TErrorCode::type expected_error = TErrorCode::OK) { + // Reset parse_status_ + scanner_.parse_status_ = Status::OK(); + + uint8_t* new_data = data; + T slot; + bool expect_success = expected_error == TErrorCode::OK; + + bool success = (scanner_.*read_fn)(type, &new_data, data + data_len, true, &slot, + NULL); + EXPECT_EQ(success, expect_success); + + if (success) { + EXPECT_TRUE(scanner_.parse_status_.ok()); + EXPECT_EQ(slot, expected_val); + EXPECT_EQ(new_data, data + expected_encoded_len); + } else { + EXPECT_EQ(scanner_.parse_status_.code(), expected_error); + } + } + + void TestReadAvroBoolean(uint8_t* data, int64_t data_len, bool expected_val, + TErrorCode::type expected_error = TErrorCode::OK) { + TestReadAvroType(&HdfsAvroScanner::ReadAvroBoolean, TYPE_BOOLEAN, data, data_len, + expected_val, 1, expected_error); + } + + void TestReadAvroInt32(uint8_t* data, int64_t data_len, int32_t expected_val, + int expected_encoded_len, TErrorCode::type expected_error = TErrorCode::OK) { + TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_INT, data, data_len, + expected_val, expected_encoded_len, expected_error); + // Test type promotion to long, float, and double + int64_t expected_bigint = expected_val; + TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_BIGINT, data, data_len, + expected_bigint, expected_encoded_len, expected_error); + float expected_float = expected_val; + TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_FLOAT, data, data_len, + expected_float, expected_encoded_len, expected_error); + double expected_double = expected_val; + TestReadAvroType(&HdfsAvroScanner::ReadAvroInt32, TYPE_DOUBLE, data, data_len, + expected_double, expected_encoded_len, expected_error); + } + + void TestReadAvroInt64(uint8_t* data, int64_t data_len, int64_t expected_val, + int expected_encoded_len, TErrorCode::type expected_error = TErrorCode::OK) { + TestReadAvroType(&HdfsAvroScanner::ReadAvroInt64, TYPE_BIGINT, data, data_len, + expected_val, expected_encoded_len, expected_error); + // Test type promotion to float and double + float expected_float = expected_val; + TestReadAvroType(&HdfsAvroScanner::ReadAvroInt64, TYPE_FLOAT, data, data_len, + expected_float, expected_encoded_len, expected_error); + double expected_double = expected_val; + TestReadAvroType(&HdfsAvroScanner::ReadAvroInt64, TYPE_DOUBLE, data, data_len, + expected_double, expected_encoded_len, expected_error); + } + + void TestReadAvroFloat(uint8_t* data, int64_t data_len, float expected_val, + TErrorCode::type expected_error = TErrorCode::OK) { + TestReadAvroType(&HdfsAvroScanner::ReadAvroFloat, TYPE_FLOAT, data, data_len, + expected_val, 4, expected_error); + // Test type promotion to double + double expected_double = expected_val; + TestReadAvroType(&HdfsAvroScanner::ReadAvroFloat, TYPE_DOUBLE, data, data_len, + expected_double, 4, expected_error); + } + + void TestReadAvroDouble(uint8_t* data, int64_t data_len, double expected_val, + TErrorCode::type expected_error = TErrorCode::OK) { + TestReadAvroType(&HdfsAvroScanner::ReadAvroDouble, TYPE_DOUBLE, data, data_len, + expected_val, 8, expected_error); + } + + void TestReadAvroString(uint8_t* data, int64_t data_len, StringValue expected_val, + int expected_encoded_len, TErrorCode::type expected_error = TErrorCode::OK) { + TestReadAvroType(&HdfsAvroScanner::ReadAvroString, TYPE_STRING, data, data_len, + expected_val, expected_encoded_len, expected_error); + } + + template<typename T> + void TestReadAvroDecimal(uint8_t* data, int64_t data_len, DecimalValue<T> expected_val, + int expected_encoded_len, TErrorCode::type expected_error = TErrorCode::OK) { + TestReadAvroType(&HdfsAvroScanner::ReadAvroDecimal, sizeof(expected_val), data, + data_len, expected_val, expected_encoded_len, expected_error); + } + + void TestInt64Val(int64_t val) { + uint8_t data[100]; + int len = ReadWriteUtil::PutZLong(val, data); + DCHECK_GT(len, 0); + TestReadAvroInt64(data, len, val, len); + TestReadAvroInt64(data, len + 1, val, len); + TestReadAvroInt64(data, len - 1, -1, -1, TErrorCode::SCANNER_INVALID_INT); + } + + protected: + HdfsAvroScanner scanner_; +}; + +// Tests reading a [<some type>, "null"] union. +TEST_F(HdfsAvroScannerTest, NullUnionTest) { + uint8_t data[100]; + data[0] = 0; + TestReadUnionType(0, data, 1, true); + TestReadUnionType(1, data, 1, false); + TestReadUnionType(0, data, 10, true); + TestReadUnionType(1, data, 10, false); + TestReadUnionType(0, data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK); + + data[0] = 2; + TestReadUnionType(0, data, 1, false); + TestReadUnionType(1, data, 1, true); + TestReadUnionType(0, data, 10, false); + TestReadUnionType(1, data, 10, true); + TestReadUnionType(0, data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK); + + data[0] = 1; + TestReadUnionType(0, data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadUnionType(0, data, 1, false, TErrorCode::AVRO_INVALID_UNION); + + data[0] = -1; + TestReadUnionType(0, data, 1, false, TErrorCode::AVRO_INVALID_UNION); +} + +TEST_F(HdfsAvroScannerTest, BooleanTest) { + uint8_t data[100]; + data[0] = 0; + TestReadAvroBoolean(data, 1, false); + TestReadAvroBoolean(data, 10, false); + TestReadAvroBoolean(data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK); + + data[0] = 1; + TestReadAvroBoolean(data, 1, true); + TestReadAvroBoolean(data, 10, true); + TestReadAvroBoolean(data, 0, false, TErrorCode::AVRO_TRUNCATED_BLOCK); + + data[0] = 2; + TestReadAvroBoolean(data, 1, false, TErrorCode::AVRO_INVALID_BOOLEAN); +} + +TEST_F(HdfsAvroScannerTest, Int32Test) { + uint8_t data[100]; + data[0] = 1; // decodes to -1 + TestReadAvroInt32(data, 1, -1, 1); + TestReadAvroInt32(data, 10, -1, 1); + TestReadAvroInt32(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + data[0] = 2; // decodes to 1 + TestReadAvroInt32(data, 1, 1, 1); + TestReadAvroInt32(data, 10, 1, 1); + TestReadAvroInt32(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + data[0] = 0x80; // decodes to 64 + data[1] = 0x01; + TestReadAvroInt32(data, 2, 64, 2); + TestReadAvroInt32(data, 10, 64, 2); + TestReadAvroInt32(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroInt32(data, 1, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + int len = ReadWriteUtil::PutZInt(INT_MAX, data); + TestReadAvroInt32(data, len, INT_MAX, len); + TestReadAvroInt32(data, len + 1, INT_MAX, len); + TestReadAvroInt32(data, len - 1, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + len = ReadWriteUtil::PutZInt(INT_MIN, data); + TestReadAvroInt32(data, len, INT_MIN, len); + TestReadAvroInt32(data, len + 1, INT_MIN, len); + TestReadAvroInt32(data, len - 1, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + // TODO: we don't handle invalid values (e.g. overflow) (IMPALA-3659) +} + +TEST_F(HdfsAvroScannerTest, Int64Test) { + uint8_t data[100]; + data[0] = 1; // decodes to -1 + TestReadAvroInt64(data, 1, -1, 1); + TestReadAvroInt64(data, 10, -1, 1); + TestReadAvroInt64(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + data[0] = 2; // decodes to 1 + TestReadAvroInt64(data, 1, 1, 1); + TestReadAvroInt64(data, 10, 1, 1); + TestReadAvroInt64(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + data[0] = 0x80; // decodes to 64 + data[1] = 0x01; + TestReadAvroInt64(data, 2, 64, 2); + TestReadAvroInt64(data, 10, 64, 2); + TestReadAvroInt64(data, 0, -1, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroInt64(data, 1, -1, -1, TErrorCode::SCANNER_INVALID_INT); + + TestInt64Val(INT_MAX); + TestInt64Val(INT_MIN); + TestInt64Val(LLONG_MAX); + TestInt64Val(LLONG_MIN); + + // TODO: we don't handle invalid values (e.g. overflow) (IMPALA-3659) +} + +TEST_F(HdfsAvroScannerTest, FloatTest) { + uint8_t data[100]; + float f = 1.23456789; + memcpy(data, &f, sizeof(float)); + TestReadAvroFloat(data, 4, f); + TestReadAvroFloat(data, 10, f); + TestReadAvroFloat(data, 0, f, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroFloat(data, 1, f, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroFloat(data, 2, f, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroFloat(data, 3, f, TErrorCode::AVRO_TRUNCATED_BLOCK); +} + +TEST_F(HdfsAvroScannerTest, DoubleTest) { + uint8_t data[100]; + double d = 1.23456789012345; + memcpy(data, &d, sizeof(double)); + TestReadAvroDouble(data, 8, d); + TestReadAvroDouble(data, 10, d); + TestReadAvroDouble(data, 0, d, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDouble(data, 1, d, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDouble(data, 2, d, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDouble(data, 3, d, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDouble(data, 7, d, TErrorCode::AVRO_TRUNCATED_BLOCK); +} + +TEST_F(HdfsAvroScannerTest, StringTest) { + uint8_t data[100]; + const char* s = "hello"; + DCHECK_EQ(strlen(s), 5); + data[0] = 10; // decodes to 5 + memcpy(&data[1], s, 5); + StringValue sv(s); + TestReadAvroString(data, 6, sv, 6); + TestReadAvroString(data, 10, sv, 6); + TestReadAvroString(data, 0, sv, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroString(data, 1, sv, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroString(data, 5, sv, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + + data[0] = 1; // decodes to -1 + TestReadAvroString(data, 10, sv, -1, TErrorCode::AVRO_INVALID_LENGTH); + + data[0] = 0; // decodes to 0 + sv.len = 0; + TestReadAvroString(data, 1, sv, 1); + TestReadAvroString(data, 10, sv, 1); +} + +TEST_F(HdfsAvroScannerTest, DecimalTest) { + uint8_t data[100]; + Decimal4Value d4v(123); + // Unscaled value (123) can be stored in 1 byte + data[0] = 2; // decodes to 1 + data[1] = 123; + TestReadAvroDecimal(data, 2, d4v, 2); + TestReadAvroDecimal(data, 10, d4v, 2); + TestReadAvroDecimal(data, 0, d4v, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroDecimal(data, 1, d4v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + + data[0] = 10; // decodes to 5 + TestReadAvroDecimal(data, 10, d4v, -1, TErrorCode::AVRO_INVALID_LENGTH); + + data[0] = 1; // decodes to -1 + TestReadAvroDecimal(data, 10, d4v, -1, TErrorCode::AVRO_INVALID_LENGTH); + + data[0] = 0; // decodes to 0 + TestReadAvroDecimal(data, 10, Decimal4Value(0), 1); + + data[0] = 0x80; // decodes to 64 + data[1] = 0x01; + TestReadAvroDecimal(data, 100, d4v, -1, TErrorCode::AVRO_INVALID_LENGTH); + + d4v = Decimal4Value(123456789); + // Unscaled value can be stored in 4 bytes + data[0] = 8; // decodes to 4 +#if __BYTE_ORDER == __LITTLE_ENDIAN + BitUtil::ByteSwap(&data[1], &d4v.value(), 4); +#else + memcpy(&data[1], &d4v.value(), 4); +#endif + TestReadAvroDecimal(data, 5, d4v, 5); + TestReadAvroDecimal(data, 10, d4v, 5); + TestReadAvroDecimal(data, 0, d4v, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroDecimal(data, 1, d4v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDecimal(data, 4, d4v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + + Decimal8Value d8v(1); + data[0] = 2; // decodes to 1 + data[1] = 1; + TestReadAvroDecimal(data, 2, d8v, 2); + TestReadAvroDecimal(data, 10, d8v, 2); + TestReadAvroDecimal(data, 0, d8v, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroDecimal(data, 1, d8v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + + d8v = Decimal8Value(123456789012345678); + data[0] = 16; // decodes to 8 +#if __BYTE_ORDER == __LITTLE_ENDIAN + BitUtil::ByteSwap(&data[1], &d8v.value(), 8); +#else + memcpy(&data[1], &d8v.value(), 8); +#endif + TestReadAvroDecimal(data, 9, d8v, 9); + TestReadAvroDecimal(data, 10, d8v, 9); + TestReadAvroDecimal(data, 0, d8v, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroDecimal(data, 1, d8v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDecimal(data, 7, d8v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + + Decimal16Value d16v(1234567890); + data[0] = 10; // decodes to 5 +#if __BYTE_ORDER == __LITTLE_ENDIAN + BitUtil::ByteSwap(&data[1], &d16v.value(), 5); +#else + memcpy(&data[1], &d16v.value(), 5); +#endif + TestReadAvroDecimal(data, 6, d16v, 6); + TestReadAvroDecimal(data, 10, d16v, 6); + TestReadAvroDecimal(data, 0, d16v, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroDecimal(data, 1, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDecimal(data, 4, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + + bool overflow; + d16v = Decimal16Value::FromDouble(38, 0, .1e38d, &overflow); + DCHECK(!overflow); + data[0] = 32; // decodes to 16 +#if __BYTE_ORDER == __LITTLE_ENDIAN + BitUtil::ByteSwap(&data[1], &d16v.value(), 16); +#else + memcpy(&data[1], &d16v.value(), 16); +#endif + TestReadAvroDecimal(data, 17, d16v, 17); + TestReadAvroDecimal(data, 20, d16v, 17); + TestReadAvroDecimal(data, 0, d16v, -1, TErrorCode::SCANNER_INVALID_INT); + TestReadAvroDecimal(data, 1, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); + TestReadAvroDecimal(data, 16, d16v, -1, TErrorCode::AVRO_TRUNCATED_BLOCK); +} + +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index ec8ec26..70dff83 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -27,6 +27,7 @@ #include "util/codec.h" #include "util/decompress.h" #include "util/runtime-profile-counters.h" +#include "util/test-info.h" #include "common/names.h" @@ -57,6 +58,13 @@ HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state) codegend_decode_avro_data_(NULL) { } +HdfsAvroScanner::HdfsAvroScanner() + : BaseSequenceScanner(), + avro_header_(NULL), + codegend_decode_avro_data_(NULL) { + DCHECK(TestInfo::is_test()); +} + Status HdfsAvroScanner::Prepare(ScannerContext* context) { RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context)); if (scan_node_->avro_schema().schema == NULL) { @@ -112,7 +120,10 @@ Status HdfsAvroScanner::ParseMetadata() { int64_t num_entries; RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_)); - if (num_entries < 1) return Status("File header metadata has no data"); + if (num_entries < 1) { + return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(), + num_entries, stream_->file_offset()); + } while (num_entries != 0) { DCHECK_GT(num_entries, 0); @@ -122,7 +133,10 @@ Status HdfsAvroScanner::ParseMetadata() { uint8_t* key_buf; int64_t key_len; RETURN_IF_FALSE(stream_->ReadZLong(&key_len, &parse_status_)); - DCHECK_GE(key_len, 0); + if (key_len < 0) { + return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), key_len, + stream_->file_offset()); + } RETURN_IF_FALSE(stream_->ReadBytes(key_len, &key_buf, &parse_status_)); key = string(reinterpret_cast<char*>(key_buf), key_len); @@ -130,7 +144,10 @@ Status HdfsAvroScanner::ParseMetadata() { uint8_t* value; int64_t value_len; RETURN_IF_FALSE(stream_->ReadZLong(&value_len, &parse_status_)); - DCHECK_GE(value_len, 0); + if (value_len < 0) { + return Status(TErrorCode::AVRO_INVALID_LENGTH, stream_->filename(), value_len, + stream_->file_offset()); + } RETURN_IF_FALSE(stream_->ReadBytes(value_len, &value, &parse_status_)); if (key == AVRO_SCHEMA_KEY) { @@ -173,6 +190,10 @@ Status HdfsAvroScanner::ParseMetadata() { } } RETURN_IF_FALSE(stream_->ReadZLong(&num_entries, &parse_status_)); + if (num_entries < 0) { + return Status(TErrorCode::AVRO_INVALID_METADATA_COUNT, stream_->filename(), + num_entries, stream_->file_offset()); + } } VLOG_FILE << stream_->filename() << ": " @@ -461,13 +482,22 @@ Status HdfsAvroScanner::ProcessRange() { uint8_t* compressed_data; int64_t compressed_size; uint8_t* data; + int64_t data_len; + uint8_t* data_end; // Read new data block RETURN_IF_FALSE( stream_->ReadZLong(&num_records, &parse_status_)); + if (num_records < 0) { + return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(), + num_records, stream_->file_offset()); + } DCHECK_GE(num_records, 0); RETURN_IF_FALSE(stream_->ReadZLong(&compressed_size, &parse_status_)); - DCHECK_GE(compressed_size, 0); + if (compressed_size < 0) { + return Status(TErrorCode::AVRO_INVALID_COMPRESSED_SIZE, stream_->filename(), + compressed_size, stream_->file_offset()); + } RETURN_IF_FALSE(stream_->ReadBytes( compressed_size, &compressed_data, &parse_status_)); @@ -477,14 +507,15 @@ Status HdfsAvroScanner::ProcessRange() { // decompressor_ doesn't expect this compressed_size -= SnappyDecompressor::TRAILING_CHECKSUM_LEN; } - int64_t size; SCOPED_TIMER(decompress_timer_); RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size, compressed_data, - &size, &data)); - VLOG_FILE << "Decompressed " << compressed_size << " to " << size; + &data_len, &data)); + VLOG_FILE << "Decompressed " << compressed_size << " to " << data_len; } else { data = compressed_data; + data_len = compressed_size; } + data_end = data + data_len; // Process block data while (num_records > 0) { @@ -501,10 +532,11 @@ Status HdfsAvroScanner::ProcessRange() { num_to_commit = WriteEmptyTuples(context_, tuple_row, max_tuples); } else { if (codegend_decode_avro_data_ != NULL) { - num_to_commit = codegend_decode_avro_data_( - this, max_tuples, pool, &data, tuple, tuple_row); + num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, &data, + data_end, tuple, tuple_row); } else { - num_to_commit = DecodeAvroData(max_tuples, pool, &data, tuple, tuple_row); + num_to_commit = DecodeAvroData(max_tuples, pool, &data, data_end, tuple, + tuple_row); } } RETURN_IF_ERROR(parse_status_); @@ -525,9 +557,11 @@ Status HdfsAvroScanner::ProcessRange() { } bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, - MemPool* pool, uint8_t** data, Tuple* tuple) { + MemPool* pool, uint8_t** data, uint8_t* data_end, Tuple* tuple) { DCHECK_EQ(record_schema.schema->type, AVRO_RECORD); for (const AvroSchemaElement& element: record_schema.children) { + DCHECK_LE(*data, data_end); + const SlotDescriptor* slot_desc = element.slot_desc; bool write_slot = false; void* slot = NULL; @@ -539,41 +573,45 @@ bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, } avro_type_t type = element.schema->type; - if (element.nullable() && !ReadUnionType(element.null_union_position, data)) { - type = AVRO_NULL; + if (element.nullable()) { + bool is_null; + if (!ReadUnionType(element.null_union_position, data, data_end, &is_null)) { + return false; + } + if (is_null) type = AVRO_NULL; } + bool success; switch (type) { case AVRO_NULL: if (slot_desc != NULL) tuple->SetNull(slot_desc->null_indicator_offset()); + success = true; break; case AVRO_BOOLEAN: - ReadAvroBoolean(slot_type, data, write_slot, slot, pool); + success = ReadAvroBoolean(slot_type, data, data_end, write_slot, slot, pool); break; case AVRO_INT32: - ReadAvroInt32(slot_type, data, write_slot, slot, pool); + success = ReadAvroInt32(slot_type, data, data_end, write_slot, slot, pool); break; case AVRO_INT64: - ReadAvroInt64(slot_type, data, write_slot, slot, pool); + success = ReadAvroInt64(slot_type, data, data_end, write_slot, slot, pool); break; case AVRO_FLOAT: - ReadAvroFloat(slot_type, data, write_slot, slot, pool); + success = ReadAvroFloat(slot_type, data, data_end, write_slot, slot, pool); break; case AVRO_DOUBLE: - ReadAvroDouble(slot_type, data, write_slot, slot, pool); + success = ReadAvroDouble(slot_type, data, data_end, write_slot, slot, pool); break; case AVRO_STRING: case AVRO_BYTES: if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) { - ReadAvroVarchar(slot_type, slot_desc->type().len, data, write_slot, slot, pool); + success = ReadAvroVarchar(slot_type, slot_desc->type().len, data, data_end, + write_slot, slot, pool); } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) { - if (UNLIKELY(!ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot, - slot, pool))) { - DCHECK(!parse_status_.ok()); - return false; - } + success = ReadAvroChar(slot_type, slot_desc->type().len, data, data_end, + write_slot, slot, pool); } else { - ReadAvroString(slot_type, data, write_slot, slot, pool); + success = ReadAvroString(slot_type, data, data_end, write_slot, slot, pool); } break; case AVRO_DECIMAL: { @@ -582,19 +620,41 @@ bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, DCHECK_EQ(slot_type, TYPE_DECIMAL); slot_byte_size = slot_desc->type().GetByteSize(); } - ReadAvroDecimal(slot_byte_size, data, write_slot, slot, pool); + success = ReadAvroDecimal(slot_byte_size, data, data_end, write_slot, slot, pool); break; } case AVRO_RECORD: - MaterializeTuple(element, pool, data, tuple); + success = MaterializeTuple(element, pool, data, data_end, tuple); break; default: DCHECK(false) << "Unsupported SchemaElement: " << type; } + if (UNLIKELY(!success)) { + DCHECK(!parse_status_.ok()); + return false; + } } return true; } +void HdfsAvroScanner::SetStatusCorruptData(TErrorCode::type error_code) { + DCHECK(parse_status_.ok()); + if (TestInfo::is_test()) { + parse_status_ = Status(error_code, "test file", 123); + } else { + parse_status_ = Status(error_code, stream_->filename(), stream_->file_offset()); + } +} + +void HdfsAvroScanner::SetStatusInvalidValue(TErrorCode::type error_code, int64_t len) { + DCHECK(parse_status_.ok()); + if (TestInfo::is_test()) { + parse_status_ = Status(error_code, "test file", len, 123); + } else { + parse_status_ = Status(error_code, stream_->filename(), len, stream_->file_offset()); + } +} + // This function produces a codegen'd function equivalent to MaterializeTuple() but // optimized for the table schema. Via helper functions CodegenReadRecord() and // CodegenReadScalar(), it eliminates the conditionals necessary when interpreting the @@ -602,76 +662,95 @@ bool HdfsAvroScanner::MaterializeTuple(const AvroSchemaElement& record_schema, // the schema. Example output with tpch.region: // // define i1 @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this, -// %"struct.impala::AvroSchemaElement"* %record_schema, -// %"class.impala::MemPool"* %pool, i8** %data, %"class.impala::Tuple"* %tuple) { +// %"struct.impala::AvroSchemaElement"* %record_schema, %"class.impala::MemPool"* %pool, +// i8** %data, i8* %data_end, %"class.impala::Tuple"* %tuple) #33 { // entry: -// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32, -// %"struct.impala::StringValue", %"struct.impala::StringValue" }* -// %is_not_null = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh( -// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data) -// br i1 %is_not_null, label %read_field, label %null_field +// %is_null_ptr = alloca i1 +// %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, [3 x i8], i32, +// %"struct.impala::StringValue", %"struct.impala::StringValue" }* +// %0 = bitcast i1* %is_null_ptr to i8* +// %read_union_ok = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb( +// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %0) +// br i1 %read_union_ok, label %read_union_ok1, label %bail_out // -// read_field: ; preds = %entry -// %slot = getelementptr inbounds { i8, i32, %"struct.impala::StringValue", -// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 1 +// read_union_ok1: ; preds = %entry +// %is_null = load i1, i1* %is_null_ptr +// br i1 %is_null, label %null_field, label %read_field +// +// read_field: ; preds = %read_union_ok1 +// %slot = getelementptr inbounds { i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }, { i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2 // %opaque_slot = bitcast i32* %slot to i8* -// call void -// @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE( -// %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i1 true, -// i8* %opaque_slot, %"class.impala::MemPool"* %pool) -// br label %end_field +// %success = call i1 +// @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE( +// %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data, i8* %data_end, +// i1 true, i8* %opaque_slot, %"class.impala::MemPool"* %pool) +// br i1 %success, label %end_field, label %bail_out // -// null_field: ; preds = %entry -// call void @SetNull({ i8, i32, %"struct.impala::StringValue", -// %"struct.impala::StringValue" }* %tuple_ptr) +// null_field: ; preds = %read_union_ok1 +// call void @SetNull({ i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr) // br label %end_field // // end_field: ; preds = %read_field, %null_field -// %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh( -// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data) -// br i1 %is_not_null4, label %read_field1, label %null_field3 +// %1 = bitcast i1* %is_null_ptr to i8* +// %read_union_ok4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb( +// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %1) +// br i1 %read_union_ok4, label %read_union_ok5, label %bail_out +// +// read_union_ok5: ; preds = %end_field +// %is_null7 = load i1, i1* %is_null_ptr +// br i1 %is_null7, label %null_field6, label %read_field2 +// +// read_field2: ; preds = %read_union_ok5 +// %slot8 = getelementptr inbounds { i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }, { i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3 +// %opaque_slot9 = bitcast %"struct.impala::StringValue"* %slot8 to i8* +// %success10 = call i1 +// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE( +// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i8* %data_end, +// i1 true, i8* %opaque_slot9, %"class.impala::MemPool"* %pool) +// br i1 %success10, label %end_field3, label %bail_out // -// read_field1: ; preds = %end_field -// %slot5 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue", -// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 2 -// %opaque_slot6 = bitcast %"struct.impala::StringValue"* %slot5 to i8* -// call void -// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE( -// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true, -// i8* %opaque_slot6, %"class.impala::MemPool"* %pool) -// br label %end_field2 +// null_field6: ; preds = %read_union_ok5 +// call void @SetNull.1({ i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr) +// br label %end_field3 // -// null_field3: ; preds = %end_field -// call void @SetNull1({ i8, i32, %"struct.impala::StringValue", -// %"struct.impala::StringValue" }* %tuple_ptr) -// br label %end_field2 +// end_field3: ; preds = %read_field2, %null_field6 +// %2 = bitcast i1* %is_null_ptr to i8* +// %read_union_ok13 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPhPlPb( +// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data, i8* %data_end, i8* %2) +// br i1 %read_union_ok13, label %read_union_ok14, label %bail_out // -// end_field2: ; preds = %read_field1, %null_field3 -// %is_not_null10 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh( -// %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data) -// br i1 %is_not_null10, label %read_field7, label %null_field9 +// read_union_ok14: ; preds = %end_field3 +// %is_null16 = load i1, i1* %is_null_ptr +// br i1 %is_null16, label %null_field15, label %read_field11 // -// read_field7: ; preds = %end_field2 -// %slot11 = getelementptr inbounds { i8, i32, %"struct.impala::StringValue", -// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 3 -// %opaque_slot12 = bitcast %"struct.impala::StringValue"* %slot11 to i8* -// call void -// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhbPvPNS_7MemPoolE( -// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i1 true, -// i8* %opaque_slot12, %"class.impala::MemPool"* %pool) -// br label %end_field8 +// read_field11: ; preds = %read_union_ok14 +// %slot17 = getelementptr inbounds { i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }, { i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 4 +// %opaque_slot18 = bitcast %"struct.impala::StringValue"* %slot17 to i8* +// %success19 = call i1 +// @_ZN6impala15HdfsAvroScanner14ReadAvroStringENS_13PrimitiveTypeEPPhPlbPvPNS_7MemPoolE( +// %"class.impala::HdfsAvroScanner"* %this, i32 10, i8** %data, i8* %data_end, +// i1 true, i8* %opaque_slot18, %"class.impala::MemPool"* %pool) +// br i1 %success19, label %end_field12, label %bail_out // -// null_field9: ; preds = %end_field2 -// call void @SetNull2({ i8, i32, %"struct.impala::StringValue", -// %"struct.impala::StringValue" }* %tuple_ptr) -// br label %end_field8 +// null_field15: ; preds = %read_union_ok14 +// call void @SetNull.2({ i8, [3 x i8], i32, %"struct.impala::StringValue", +// %"struct.impala::StringValue" }* %tuple_ptr) +// br label %end_field12 // -// end_field8: ; preds = %read_field7, %null_field9 +// end_field12: ; preds = %read_field11, %null_field15 // ret i1 true // -// bail_out: ; No predecessors! -// ret i1 false // used only if there is CHAR. -//} +// bail_out: ; preds = %read_field11, %end_field3, %read_field2, %end_field, +// ret i1 false ; %read_field, %entry +// } Function* HdfsAvroScanner::CodegenMaterializeTuple( HdfsScanNode* node, LlvmCodeGen* codegen) { LLVMContext& context = codegen->context(); @@ -698,15 +777,17 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple( prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type)); prototype.AddArgument(LlvmCodeGen::NamedVariable("data", data_ptr_type)); + prototype.AddArgument(LlvmCodeGen::NamedVariable("data_end", codegen->ptr_type())); prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type)); - Value* args[5]; + Value* args[6]; Function* fn = prototype.GeneratePrototype(&builder, args); Value* this_val = args[0]; // Value* record_schema_val = args[1]; // don't need this Value* pool_val = args[2]; Value* data_val = args[3]; - Value* opaque_tuple_val = args[4]; + Value* data_end_val = args[4]; + Value* opaque_tuple_val = args[5]; Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr"); @@ -715,7 +796,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple( Status status = CodegenReadRecord( SchemaPath(), node->avro_schema(), node, codegen, &builder, fn, bail_out_block, - bail_out_block, this_val, pool_val, tuple_val, data_val); + bail_out_block, this_val, pool_val, tuple_val, data_val, data_end_val); if (!status.ok()) { VLOG_QUERY << status.GetDetail(); fn->eraseFromParent(); @@ -736,7 +817,7 @@ Status HdfsAvroScanner::CodegenReadRecord( const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node, LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before, BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val, - Value* data_val) { + Value* data_val, Value* data_end_val) { if (record.schema == NULL) { return Status("Missing Avro schema in scan node. This could be due to stale " "metadata. Running 'invalidate metadata <tablename>' may resolve the problem."); @@ -748,6 +829,9 @@ Status HdfsAvroScanner::CodegenReadRecord( // Codegen logic for parsing each field and, if necessary, populating a slot with the // result. + + // Used to store result of ReadUnionType() call + Value* is_null_ptr = NULL; for (int i = 0; i < record.children.size(); ++i) { const AvroSchemaElement* field = &record.children[i]; int col_idx = i; @@ -775,14 +859,25 @@ Status HdfsAvroScanner::CodegenReadRecord( if (field->nullable()) { // Field could be null. Create conditional branch based on ReadUnionType result. - null_block = BasicBlock::Create(context, "null_field", fn, end_field_block); - Function* read_union_fn = - codegen->GetFunction(IRFunction::READ_UNION_TYPE, false); + Function* read_union_fn = codegen->GetFunction(IRFunction::READ_UNION_TYPE, false); Value* null_union_pos_val = codegen->GetIntConstant(TYPE_INT, field->null_union_position); - Value* is_not_null_val = builder->CreateCall(read_union_fn, - ArrayRef<Value*>({this_val, null_union_pos_val, data_val}), "is_not_null"); - builder->CreateCondBr(is_not_null_val, read_field_block, null_block); + if (is_null_ptr == NULL) { + is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->boolean_type(), + "is_null_ptr"); + } + Value* is_null_ptr_cast = builder->CreateBitCast(is_null_ptr, codegen->ptr_type()); + Value* read_union_ok = builder->CreateCall(read_union_fn, + ArrayRef<Value*>({this_val, null_union_pos_val, data_val, data_end_val, + is_null_ptr_cast}), "read_union_ok"); + BasicBlock* read_union_ok_block = BasicBlock::Create(context, "read_union_ok", fn, + read_field_block); + builder->CreateCondBr(read_union_ok, read_union_ok_block, bail_out); + + builder->SetInsertPoint(read_union_ok_block); + null_block = BasicBlock::Create(context, "null_field", fn, end_field_block); + Value* is_null = builder->CreateLoad(is_null_ptr, "is_null"); + builder->CreateCondBr(is_null, null_block, read_field_block); // Write null field IR builder->SetInsertPoint(null_block); @@ -800,16 +895,18 @@ Status HdfsAvroScanner::CodegenReadRecord( // Write read_field_block IR builder->SetInsertPoint(read_field_block); + Value* ret_val; if (field->schema->type == AVRO_RECORD) { BasicBlock* insert_before_block = (null_block != NULL) ? null_block : end_field_block; RETURN_IF_ERROR(CodegenReadRecord(new_path, *field, node, codegen, builder, fn, - insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val)); + insert_before_block, bail_out, this_val, pool_val, tuple_val, data_val, + data_end_val)); } else { RETURN_IF_ERROR(CodegenReadScalar(*field, slot_desc, codegen, builder, - end_field_block, bail_out, this_val, pool_val, tuple_val, data_val)); + this_val, pool_val, tuple_val, data_val, data_end_val, &ret_val)); } - builder->CreateBr(end_field_block); + builder->CreateCondBr(ret_val, end_field_block, bail_out); // Set insertion point for next field. builder->SetInsertPoint(end_field_block); @@ -819,8 +916,8 @@ Status HdfsAvroScanner::CodegenReadRecord( Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, - BasicBlock* end_field_block, BasicBlock* bail_out_block, Value* this_val, - Value* pool_val, Value* tuple_val, Value* data_val) { + Value* this_val, Value* pool_val, Value* tuple_val, Value* data_val, + Value* data_end_val, Value** ret_val) { LlvmCodeGen::LlvmBuilder* builder = reinterpret_cast<LlvmCodeGen::LlvmBuilder*>(void_builder); Function* read_field_fn; @@ -882,18 +979,12 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, // Need to pass an extra argument (the length) to the codegen function. Value* fixed_len = builder->getInt32(slot_desc->type().len); Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val, - write_slot_val, opaque_slot_val, pool_val}; - if (slot_desc->type().type == TYPE_VARCHAR) { - builder->CreateCall(read_field_fn, read_field_args); - } else { - // ReadAvroChar() returns false if allocation from MemPool fails. - Value* ret_val = builder->CreateCall(read_field_fn, read_field_args); - builder->CreateCondBr(ret_val, end_field_block, bail_out_block); - } + data_end_val, write_slot_val, opaque_slot_val, pool_val}; + *ret_val = builder->CreateCall(read_field_fn, read_field_args, "success"); } else { - Value* read_field_args[] = - {this_val, slot_type_val, data_val, write_slot_val, opaque_slot_val, pool_val}; - builder->CreateCall(read_field_fn, read_field_args); + Value* read_field_args[] = {this_val, slot_type_val, data_val, data_end_val, + write_slot_val, opaque_slot_val, pool_val}; + *ret_val = builder->CreateCall(read_field_fn, read_field_args, "success"); } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h index 316f697..2069a9b 100644 --- a/be/src/exec/hdfs-avro-scanner.h +++ b/be/src/exec/hdfs-avro-scanner.h @@ -68,6 +68,8 @@ #include "exec/base-sequence-scanner.h" #include <avro/basics.h> + +#include "exec/read-write-util.h" #include "runtime/tuple.h" #include "runtime/tuple-row.h" @@ -105,6 +107,8 @@ class HdfsAvroScanner : public BaseSequenceScanner { } private: + friend class HdfsAvroScannerTest; + struct AvroFileHeader : public BaseSequenceScanner::FileHeader { /// The root of the file schema tree (i.e. the top-level record schema of the file) ScopedAvroSchemaElement schema; @@ -131,7 +135,7 @@ class HdfsAvroScanner : public BaseSequenceScanner { static const std::string AVRO_SNAPPY_CODEC; static const std::string AVRO_DEFLATE_CODEC; - typedef int (*DecodeAvroDataFn)(HdfsAvroScanner*, int, MemPool*, uint8_t**, + typedef int (*DecodeAvroDataFn)(HdfsAvroScanner*, int, MemPool*, uint8_t**, uint8_t*, Tuple*, TupleRow*); /// The codegen'd version of DecodeAvroData() if available, NULL otherwise. @@ -172,17 +176,18 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// Returns the number of tuples to be committed. /// - max_tuples: the maximum number of tuples to write /// - data: serialized record data. Is advanced as records are read. + /// - data_end: pointer to the end of the data buffer (i.e. the first invalid byte). /// - pool: memory pool to allocate string data from /// - tuple: tuple pointer to copy objects to /// - tuple_row: tuple row of written tuples - int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data, + int DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data, uint8_t* data_end, Tuple* tuple, TupleRow* tuple_row); /// Materializes a single tuple from serialized record data. Will return false and set /// error in parse_status_ if memory limit is exceeded when allocating new char buffer. /// See comments below for ReadAvroChar(). bool MaterializeTuple(const AvroSchemaElement& record_schema, MemPool* pool, - uint8_t** data, Tuple* tuple); + uint8_t** data, uint8_t* data_end, Tuple* tuple); /// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted /// functions. @@ -208,63 +213,80 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// the bail_out block or some basic blocks before that. /// - bail_out: the block to jump to if anything fails. This is used in particular by /// ReadAvroChar() which can exceed memory limit during allocation from MemPool. - /// - this_val, pool_val, tuple_val, data_val: arguments to MaterializeTuple() + /// - this_val, pool_val, tuple_val, data_val, data_end_val: arguments to + /// MaterializeTuple() static Status CodegenReadRecord( const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node, LlvmCodeGen* codegen, void* builder, llvm::Function* fn, llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, llvm::Value* this_val, - llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val); + llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val, + llvm::Value* data_end_val); /// Creates the IR for reading an Avro scalar at builder's current insert point. static Status CodegenReadScalar(const AvroSchemaElement& element, SlotDescriptor* slot_desc, LlvmCodeGen* codegen, void* void_builder, - llvm::BasicBlock* end_field_block, llvm::BasicBlock* bail_out_block, llvm::Value* this_val, llvm::Value* pool_val, llvm::Value* tuple_val, - llvm::Value* data_val); + llvm::Value* data_val, llvm::Value* data_end_val, llvm::Value** ret_val); /// The following are cross-compiled functions for parsing a serialized Avro primitive /// type and writing it to a slot. They can also be used for skipping a field without /// writing it to a slot by setting 'write_slot' to false. /// - data: Serialized record data. Is advanced past the read field. + /// - data_end: pointer to the end of the data buffer (i.e. the first invalid byte). /// The following arguments are used only if 'write_slot' is true: /// - slot: The tuple slot to write the parsed field into. /// - type: The type of the slot. (This is necessary because there is not a 1:1 mapping /// between Avro types and Impala's primitive types.) /// - pool: MemPool for string data. /// - /// ReadAvroChar() will return false and set error in parse_status_ if memory limit - /// is exceeded when allocating the new char buffer. It returns true otherwise. + /// All return false and set parse_status_ on error (e.g. mem limit exceeded when + /// allocating buffer, malformed data), and return true otherwise. /// - void ReadAvroBoolean( - PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); - void ReadAvroInt32( - PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); - void ReadAvroInt64( - PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); - void ReadAvroFloat( - PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); - void ReadAvroDouble( - PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); - void ReadAvroVarchar( - PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot, - MemPool* pool); - bool ReadAvroChar( - PrimitiveType type, int max_len, uint8_t** data, bool write_slot, void* slot, - MemPool* pool); - void ReadAvroString( - PrimitiveType type, uint8_t** data, bool write_slot, void* slot, MemPool* pool); + bool ReadAvroBoolean(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + bool ReadAvroInt32(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + bool ReadAvroInt64(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + bool ReadAvroFloat(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + bool ReadAvroDouble(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + bool ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + bool ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + bool ReadAvroString(PrimitiveType type, uint8_t** data, uint8_t* data_end, + bool write_slot, void* slot, MemPool* pool); + + /// Helper function for some of the above. Returns the the length of certain varlen + /// types and updates 'data'. Returns true on success, returns false and updates + /// parse_status_ on error. + ReadWriteUtil::ZLongResult ReadFieldLen(uint8_t** data, uint8_t* data_end); /// Same as the above functions, except takes the size of the decimal slot (i.e. 4, 8, or /// 16) instead of the type (which should be TYPE_DECIMAL). The slot size is passed /// explicitly, rather than passing a ColumnType, so we can easily pass in a constant in /// the codegen'd MaterializeTuple() function. If 'write_slot' is false, 'slot_byte_size' /// is ignored. - void ReadAvroDecimal( - int slot_byte_size, uint8_t** data, bool write_slot, void* slot, MemPool* pool); + bool ReadAvroDecimal( + int slot_byte_size, uint8_t** data, uint8_t* data_end, bool write_slot, void* slot, + MemPool* pool); + + /// Reads and advances 'data' past the union branch index and sets 'is_null' according + /// to if the corresponding element is null. 'null_union_position' must be 0 or + /// 1. Returns false and sets parse_status_ if there's an error, otherwise returns true. + bool ReadUnionType(int null_union_position, uint8_t** data, uint8_t* data_end, + bool* is_null); + + /// Helper functions to set parse_status_ outside of xcompiled functions. This is to + /// avoid including string construction, etc. in the IR, which boths bloats it and can + /// contain exception handling code. + void SetStatusCorruptData(TErrorCode::type error_code); + void SetStatusInvalidValue(TErrorCode::type error_code, int64_t len); - /// Reads and advances 'data' past the union branch index and returns true if the - /// corresponding element is non-null. 'null_union_position' must be 0 or 1. - bool ReadUnionType(int null_union_position, uint8_t** data); + /// Unit test constructor + HdfsAvroScanner(); static const char* LLVM_CLASS_NAME; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-avro-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc index 9ee704d..f3bc3cb 100644 --- a/be/src/exec/hdfs-avro-table-writer.cc +++ b/be/src/exec/hdfs-avro-table-writer.cc @@ -71,7 +71,8 @@ inline void HdfsAvroTableWriter::AppendField(const ColumnType& type, const void* // Each avro field is written as union, which is a ZLong indicating the union // field followed by the encoded value. Impala/Hive always stores values as // a union of [ColumnType, NULL]. - // TODO check if we want to support [NULL, ColumnType] union + // TODO: the writer may be asked to write [NULL, ColumnType] unions. It is wrong + // for us to assume [ColumnType, NULL]. if (value == NULL) { // indicate the second field of the union http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 74a5efb..78d4994 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -41,6 +41,7 @@ #include "util/runtime-profile-counters.h" #include "util/sse-util.h" #include "util/string-parser.h" +#include "util/test-info.h" #include "gen-cpp/PlanNodes_types.h" #include "common/names.h" @@ -71,6 +72,26 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state) write_tuples_fn_(NULL) { } +HdfsScanner::HdfsScanner() + : scan_node_(NULL), + state_(NULL), + context_(NULL), + stream_(NULL), + scanner_conjunct_ctxs_(NULL), + template_tuple_(NULL), + tuple_byte_size_(-1), + tuple_(NULL), + batch_(NULL), + tuple_mem_(NULL), + num_null_bytes_(-1), + parse_status_(Status::OK()), + decompression_type_(THdfsCompression::NONE), + data_buffer_pool_(NULL), + decompress_timer_(NULL), + write_tuples_fn_(NULL) { + DCHECK(TestInfo::is_test()); +} + HdfsScanner::~HdfsScanner() { DCHECK(batch_ == NULL); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 9206e77..7f804f1 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -420,6 +420,9 @@ class HdfsScanner { /// WriteCompleteTuple() because it's easier than writing IR to access /// scanner_conjunct_ctxs_. ExprContext* GetConjunctCtx(int idx) const; + + /// Unit test constructor + HdfsScanner(); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/read-write-util.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/read-write-util.cc b/be/src/exec/read-write-util.cc index cc0cd0a..22e3abd 100644 --- a/be/src/exec/read-write-util.cc +++ b/be/src/exec/read-write-util.cc @@ -18,23 +18,76 @@ using namespace impala; +namespace { + +// Returns MAX_ZLONG_LEN + 1 if the encoded int is more than MAX_ZLONG_LEN bytes long, +// otherwise returns the length of the encoded int. Reads MAX_ZLONG_LEN bytes in 'buf'. +int FindZIntegerLength(uint8_t* buf) { + uint64_t x = *reinterpret_cast<uint64_t*>(buf); + for (int i = 0; i < sizeof(x); ++i) { + if ((x & (0x80LL << (i * 8))) == 0) return i + 1; + } + uint16_t y = *reinterpret_cast<uint16_t*>(buf + 8); + if ((y & 0x80) == 0) return 9; + if ((y & 0x8000) == 0) return 10; + return 11; +} + +// Slow path for ReadZInteger() that checks for out-of-bounds on every byte +template <int MAX_LEN, typename ZResultType> +ZResultType ReadZIntegerSlow(uint8_t** buf, uint8_t* buf_end) { + uint64_t zlong = 0; + int shift = 0; + bool more = true; + for (int i = 0; more && i < MAX_LEN; ++i) { + if (UNLIKELY(*buf >= buf_end)) return ZResultType::error(); + DCHECK_LE(shift, 64); + zlong |= static_cast<uint64_t>(**buf & 0x7f) << shift; + shift += 7; + more = (**buf & 0x80) != 0; + ++(*buf); + } + // Invalid int that's longer than maximum + if (UNLIKELY(more)) return ZResultType::error(); + return ZResultType((zlong >> 1) ^ -(zlong & 1)); +} + +} + // This function is not inlined because it can potentially cause LLVM to crash (see // http://llvm.org/bugs/show_bug.cgi?id=19315), and inlining does not appear to have any // performance impact. -int64_t ReadWriteUtil::ReadZLong(uint8_t** buf) { +template <int MAX_LEN, typename ZResultType> +ZResultType ReadWriteUtil::ReadZInteger(uint8_t** buf, uint8_t* buf_end) { + DCHECK(MAX_LEN == MAX_ZINT_LEN || MAX_LEN == MAX_ZLONG_LEN); + + // Use MAX_ZLONG_LEN rather than MAX_LEN since FindZIntegerLength() always assumes at + // least MAX_ZLONG_LEN bytes in buffer. + if (UNLIKELY(buf_end - *buf < MAX_ZLONG_LEN)) { + return ReadZIntegerSlow<MAX_LEN, ZResultType>(buf, buf_end); + } + // Once we get here, we don't need to worry about going off end of buffer. + int num_bytes = FindZIntegerLength(*buf); + if (UNLIKELY(num_bytes > MAX_LEN)) return ZResultType::error(); + uint64_t zlong = 0; int shift = 0; - bool more; - do { - DCHECK_LE(shift, 64); + for (int i = 0; i < num_bytes; ++i) { zlong |= static_cast<uint64_t>(**buf & 0x7f) << shift; shift += 7; - more = (**buf & 0x80) != 0; ++(*buf); - } while (more); - return (zlong >> 1) ^ -(zlong & 1); + } + return ZResultType((zlong >> 1) ^ -(zlong & 1)); } +// Instantiate the template for long and int. +template ReadWriteUtil::ZLongResult +ReadWriteUtil::ReadZInteger<ReadWriteUtil::MAX_ZLONG_LEN, ReadWriteUtil::ZLongResult>( + uint8_t** buf, uint8_t* buf_end); +template ReadWriteUtil::ZIntResult +ReadWriteUtil::ReadZInteger<ReadWriteUtil::MAX_ZINT_LEN, ReadWriteUtil::ZIntResult>( + uint8_t** buf, uint8_t* buf_end); + int ReadWriteUtil::PutZInt(int32_t integer, uint8_t* buf) { // Move the sign bit to the first bit. uint32_t uinteger = (integer << 1) ^ (integer >> 31); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/read-write-util.h ---------------------------------------------------------------------- diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h index 2321939..fa9de9b 100644 --- a/be/src/exec/read-write-util.h +++ b/be/src/exec/read-write-util.h @@ -84,13 +84,43 @@ class ReadWriteUtil { /// Determines the total length in bytes of a Writable VInt/VLong from the first byte. static int DecodeVIntSize(int8_t byte); + /// Return values for ReadZLong() and ReadZInt(). We return these in a single struct, + /// rather than using an output parameter, for performance (this way both values are + /// returned as registers). + template <typename T> + struct ZResult { + /// False if there was a problem reading the value. + bool ok; + /// The decoded value. Only valid if 'ok' is true. + T val; + + ZResult(T v) : ok(true), val(v) { } + static ZResult error() { return ZResult(); } + + private: + ZResult() : ok(false) { } + }; + + typedef ZResult<int64_t> ZLongResult; + typedef ZResult<int32_t> ZIntResult; + /// Read a zig-zag encoded long. This is the integer encoding defined by google.com - /// protocol-buffers: https://developers.google.com/protocol-buffers/docs/encoding - /// *buf is incremented past the encoded long. - static int64_t ReadZLong(uint8_t** buf); + /// protocol-buffers: https://developers.google.com/protocol-buffers/docs/encoding. *buf + /// is incremented past the encoded long. 'buf_end' should point to the end of 'buf' + /// (i.e. the first invalid byte). + /// + /// Returns a non-OK result if the encoded int spans too much many bytes. Unspecified + /// for values that have the correct number of bytes but overflow the destination type + /// (for both long and int, there are extra bits in the highest-order byte). + static inline ZLongResult ReadZLong(uint8_t** buf, uint8_t* buf_end) { + return ReadZInteger<MAX_ZLONG_LEN, ZLongResult>(buf, buf_end); + } + /// Read a zig-zag encoded int. - static int32_t ReadZInt(uint8_t** buf); + static inline ZIntResult ReadZInt(uint8_t** buf, uint8_t* buf_end) { + return ReadZInteger<MAX_ZINT_LEN, ZIntResult>(buf, buf_end); + } /// The following methods read data from a buffer without assuming the buffer is long /// enough. If the buffer isn't long enough or another error occurs, they return false @@ -104,6 +134,12 @@ class ReadWriteUtil { /// Skip the next num_bytes bytes. static bool SkipBytes(uint8_t** buf, int* buf_len, int num_bytes, Status* status); + + private: + /// Implementation for ReadZLong() and ReadZInt(). MAX_LEN is MAX_ZLONG_LEN or + /// MAX_ZINT_LEN. + template<int MAX_LEN, typename ZResult> + static ZResult ReadZInteger(uint8_t** buf, uint8_t* buf_end); }; template<> @@ -213,11 +249,6 @@ inline int64_t ReadWriteUtil::PutVInt(int32_t val, uint8_t* buf) { return PutVLong(val, buf); } -inline int32_t ReadWriteUtil::ReadZInt(uint8_t** buf) { - int64_t zlong = ReadZLong(buf); - return static_cast<int32_t>(zlong); -} - template <class T> inline bool ReadWriteUtil::Read(uint8_t** buf, int* buf_len, T* val, Status* status) { int val_len = sizeof(T); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 833436c..3c78d88 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -121,7 +121,12 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED; - // io_buffer_ should only be null the first time this is called + // Nothing to do if we've already processed all data in the file + int64_t offset = file_offset() + boundary_buffer_bytes_left_; + int64_t file_bytes_remaining = file_desc()->file_length - offset; + if (io_buffer_ == NULL && file_bytes_remaining == 0) return Status::OK(); + + // Otherwise, io_buffer_ should only be null the first time this is called DCHECK(io_buffer_ != NULL || (total_bytes_returned_ == 0 && completed_io_buffers_.empty())); @@ -140,11 +145,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { RETURN_IF_ERROR(scan_range_->GetNext(&io_buffer_)); } else { SCOPED_TIMER(parent_->state_->total_storage_wait_timer()); - int64_t offset = file_offset() + boundary_buffer_bytes_left_; int64_t read_past_buffer_size = read_past_size_cb_.empty() ? DEFAULT_READ_PAST_SIZE : read_past_size_cb_(offset); - int64_t file_bytes_remaining = file_desc()->file_length - offset; read_past_buffer_size = ::max(read_past_buffer_size, read_past_size); read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining); // We're reading past the scan range. Be careful not to read past the end of file. @@ -258,7 +261,7 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len, // We have enough bytes in io_buffer_ or couldn't read more bytes int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_; - DCHECK_GE(requested_len, 0); + DCHECK_GE(requested_bytes_left, 0); int64_t num_bytes = min(io_buffer_bytes_left_, requested_bytes_left); *out_len = boundary_buffer_bytes_left_ + num_bytes; DCHECK_LE(*out_len, requested_len); @@ -306,3 +309,7 @@ Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t byte Status ScannerContext::Stream::ReportInvalidRead(int64_t length) { return Status(TErrorCode::SCANNER_INVALID_READ, length, filename(), file_offset()); } + +Status ScannerContext::Stream::ReportInvalidInt() { + return Status(TErrorCode::SCANNER_INVALID_INT, filename(), file_offset()); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index e1dee97..5d6ea06 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -241,6 +241,7 @@ class ScannerContext { /// Error-reporting functions. Status ReportIncompleteRead(int64_t length, int64_t bytes_read); Status ReportInvalidRead(int64_t length); + Status ReportInvalidInt(); }; bool HasStream() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/scanner-context.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.inline.h b/be/src/exec/scanner-context.inline.h index 1a36b66..1b12b52 100644 --- a/be/src/exec/scanner-context.inline.h +++ b/be/src/exec/scanner-context.inline.h @@ -146,16 +146,20 @@ inline bool ScannerContext::Stream::ReadVLong(int64_t* value, Status* status) { } inline bool ScannerContext::Stream::ReadZLong(int64_t* value, Status* status) { - uint64_t zlong = 0; - int shift = 0; - uint8_t* byte; - do { - DCHECK_LE(shift, 64); - RETURN_IF_FALSE(ReadBytes(1, &byte, status)); - zlong |= static_cast<uint64_t>(*byte & 0x7f) << shift; - shift += 7; - } while (*byte & 0x80); - *value = (zlong >> 1) ^ -(zlong & 1); + uint8_t* bytes; + int64_t bytes_len; + RETURN_IF_FALSE( + GetBytes(ReadWriteUtil::MAX_ZLONG_LEN, &bytes, &bytes_len, status, true)); + + uint8_t* new_bytes = bytes; + ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_bytes, bytes + bytes_len); + if (UNLIKELY(!r.ok)) { + *status = ReportInvalidInt(); + return false; + } + *value = r.val; + int64_t bytes_read = new_bytes - bytes; + RETURN_IF_FALSE(SkipBytes(bytes_read, status)); return true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/be/src/exec/zigzag-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/zigzag-test.cc b/be/src/exec/zigzag-test.cc index 6622d56..247510f 100644 --- a/be/src/exec/zigzag-test.cc +++ b/be/src/exec/zigzag-test.cc @@ -26,30 +26,51 @@ namespace impala { +void TestZInt(uint8_t* buf, int64_t buf_len, int32_t expected_val, + int expected_encoded_len) { + uint8_t* new_buf = buf; + ReadWriteUtil::ZIntResult r = ReadWriteUtil::ReadZInt(&new_buf, buf + buf_len); + EXPECT_TRUE(r.ok); + EXPECT_EQ(r.val, expected_val); + EXPECT_EQ(new_buf - buf, expected_encoded_len); +} + void TestZInt(int32_t value) { uint8_t buf[ReadWriteUtil::MAX_ZINT_LEN]; int plen = ReadWriteUtil::PutZInt(value, static_cast<uint8_t*>(buf)); EXPECT_TRUE(plen <= ReadWriteUtil::MAX_ZINT_LEN); + TestZInt(buf, sizeof(buf), value, plen); +} - uint8_t* buf_ptr = static_cast<uint8_t*>(buf); - int32_t val = ReadWriteUtil::ReadZInt(&buf_ptr); - EXPECT_EQ(value, val); - int len = buf_ptr - buf; - EXPECT_GT(len, 0); - EXPECT_LE(len, sizeof(buf)); +void TestZLong(uint8_t* buf, int64_t buf_len, int64_t expected_val, + int expected_encoded_len) { + uint8_t* new_buf = buf; + ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_buf, buf + buf_len); + EXPECT_TRUE(r.ok); + EXPECT_EQ(r.val, expected_val); + EXPECT_EQ(new_buf - buf, expected_encoded_len); } void TestZLong(int64_t value) { uint8_t buf[ReadWriteUtil::MAX_ZLONG_LEN]; int plen = ReadWriteUtil::PutZLong(value, static_cast<uint8_t*>(buf)); EXPECT_TRUE(plen <= ReadWriteUtil::MAX_ZLONG_LEN); + TestZLong(buf, sizeof(buf), value, plen); +} - uint8_t* buf_ptr = static_cast<uint8_t*>(buf); - int64_t val = ReadWriteUtil::ReadZLong(&buf_ptr); - EXPECT_EQ(value, val); - int len = buf_ptr - buf; - EXPECT_GT(len, 0); - EXPECT_LE(len, sizeof(buf)); +// No expected value +void TestZInt(uint8_t* buf, int64_t buf_len, int expected_encoded_len) { + uint8_t* new_buf = buf; + ReadWriteUtil::ZIntResult r = ReadWriteUtil::ReadZInt(&new_buf, buf + buf_len); + EXPECT_TRUE(r.ok); + EXPECT_EQ(new_buf - buf, expected_encoded_len); +} + +void TestZLong(uint8_t* buf, int64_t buf_len, int expected_encoded_len) { + uint8_t* new_buf = buf; + ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_buf, buf + buf_len); + EXPECT_TRUE(r.ok); + EXPECT_EQ(new_buf - buf, expected_encoded_len); } // Test put and get of zigzag integers and longs. @@ -60,7 +81,7 @@ TEST(ZigzagTest, Basic) { TestZInt(INT_MIN); TestZInt(SHRT_MIN); TestZInt(SHRT_MAX); - TestZInt(0); + TestZLong(0); TestZLong(LONG_MAX); TestZLong(LONG_MIN); TestZLong(INT_MAX); @@ -78,6 +99,66 @@ TEST(ZigzagTest, Basic) { TestZLong((static_cast<int64_t>(value) << 32) | value); } } + +TEST(ZigzagTest, Errors) { + uint8_t buf[100]; + memset(buf, 0x80, sizeof(buf)); + + // Test 100-byte int + uint8_t* buf_ptr = static_cast<uint8_t*>(buf); + int64_t buf_len = sizeof(buf); + EXPECT_TRUE(ReadWriteUtil::ReadZLong(&buf_ptr, buf + buf_len).error); + EXPECT_TRUE(ReadWriteUtil::ReadZInt(&buf_ptr, buf + buf_len).error); + + // Test truncated int + buf_ptr = static_cast<uint8_t*>(buf); + buf_len = ReadWriteUtil::MAX_ZLONG_LEN - 1; + EXPECT_TRUE(ReadWriteUtil::ReadZLong(&buf_ptr, buf + buf_len).error); + buf_len = ReadWriteUtil::MAX_ZINT_LEN - 1; + EXPECT_TRUE(ReadWriteUtil::ReadZInt(&buf_ptr, buf + buf_len).error); +} + + // Test weird encodings and values that are arguably invalid but we still accept +TEST(ZigzagTest, Weird) { + uint8_t buf[100]; + + // Decodes to 0 but encoded in two bytes + buf[0] = 0x80; + buf[1] = 0x0; + TestZInt(buf, 2, 0, 2); + TestZLong(buf, 2, 0, 2); + TestZInt(buf, sizeof(buf), 0, 2); + TestZLong(buf, sizeof(buf), 0, 2); + + // Decodes to 1 but encoded in MAX_ZINT_LEN bytes + memset(buf, 0x80, ReadWriteUtil::MAX_ZINT_LEN); + buf[0] = 0x82; + buf[ReadWriteUtil::MAX_ZINT_LEN - 1] = 0x0; + TestZInt(buf, ReadWriteUtil::MAX_ZINT_LEN, 1, ReadWriteUtil::MAX_ZINT_LEN); + TestZLong(buf, ReadWriteUtil::MAX_ZINT_LEN, 1, ReadWriteUtil::MAX_ZINT_LEN); + TestZInt(buf, sizeof(buf), 1, ReadWriteUtil::MAX_ZINT_LEN); + TestZLong(buf, sizeof(buf), 1, ReadWriteUtil::MAX_ZINT_LEN); + + // Decodes to 1 but encoded in MAX_ZLONG_LEN bytes + memset(buf, 0x80, ReadWriteUtil::MAX_ZLONG_LEN); + buf[0] = 0x82; + buf[ReadWriteUtil::MAX_ZLONG_LEN - 1] = 0x0; + TestZLong(buf, ReadWriteUtil::MAX_ZLONG_LEN, 1, ReadWriteUtil::MAX_ZLONG_LEN); + TestZLong(buf, sizeof(buf), 1, ReadWriteUtil::MAX_ZLONG_LEN); + + // Overflows a long. Check that we don't crash and decode the correct number of bytes, + // but don't check for a particular value. + memset(buf, 0xff, ReadWriteUtil::MAX_ZLONG_LEN); + buf[ReadWriteUtil::MAX_ZLONG_LEN - 1] ^= 0x80; + TestZLong(buf, ReadWriteUtil::MAX_ZLONG_LEN, ReadWriteUtil::MAX_ZLONG_LEN); + + // Overflows an int. Check that we don't crash and decode the correct number of bytes, + // but don't check for a particular value. + memset(buf, 0xff, ReadWriteUtil::MAX_ZINT_LEN); + buf[ReadWriteUtil::MAX_ZINT_LEN - 1] ^= 0x80; + TestZInt(buf, ReadWriteUtil::MAX_ZINT_LEN, ReadWriteUtil::MAX_ZINT_LEN); + +} } int main(int argc, char **argv) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01287a3b/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 671d27e..6c8a621 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -245,6 +245,26 @@ error_codes = ( ("PARTITIONED_AGG_REPARTITION_FAILS", 78, "Cannot perform aggregation at node with " "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning " "level $1. Number of rows $2."), + + ("AVRO_TRUNCATED_BLOCK", 79, "File '$0' is corrupt: truncated data block at offset $1"), + + ("AVRO_INVALID_UNION", 80, "File '$0' is corrupt: invalid union value $1 at offset $2"), + + ("AVRO_INVALID_BOOLEAN", 81, "File '$0' is corrupt: invalid boolean value $1 at offset " + "$2"), + + ("AVRO_INVALID_LENGTH", 82, "File '$0' is corrupt: invalid length $1 at offset $2"), + + ("SCANNER_INVALID_INT", 83, "File '$0' is corrupt: invalid encoded integer at offset $1"), + + ("AVRO_INVALID_RECORD_COUNT", 84, "File '$0' is corrupt: invalid record count $1 at " + "offset $2"), + + ("AVRO_INVALID_COMPRESSED_SIZE", 85, "File '$0' is corrupt: invalid compressed block " + "size $1 at offset $2"), + + ("AVRO_INVALID_METADATA_COUNT", 86, "File '$0' is corrupt: invalid metadata count $1 " + "at offset $2"), ) import sys
