Repository: incubator-impala Updated Branches: refs/heads/master 286da5921 -> 9162d5d05
IMPALA-3764,3914: fuzz test HDFS scanners and fix parquet bugs found This adds a test that performs some simple fuzz testing of HDFS scanners. It creates a copy of a given HDFS table, with each file in the table corrupted in a random way: either a single byte is set to a random value, or the file is truncated to a random length. It then runs a query that scans the whole table with several different batch_size settings. I made some effort to make the failures reproducible by explicitly seeding the random number generator, and providing a mechanism to override the seed. The fuzzer has found crashes resulting from corrupted or truncated input files for RCFile, SequenceFile, Parquet, and Text LZO so far. Avro only had a small buffer read overrun detected by ASAN. Includes fixes for Parquet crashes found by the fuzzer, a small buffer overrun in Avro, and a DCHECK in MemPool. Initially it is only enabled for Avro, Parquet, and uncompressed text. As follow-up work we should fix the bugs in the other scanners and enable the test for them. We also don't implement abort_on_error=0 correctly in Parquet: for some file formats, corrupt headers result in the query being aborted, so an exception will xfail the test. Testing: Ran the test with exploration_strategy=exhaustive in a loop locally with both DEBUG and ASAN builds for a couple of days over a weekend. Also ran exhaustive private build. Change-Id: I50cf43195a7c582caa02c85ae400ea2256fa3a3b Reviewed-on: http://gerrit.cloudera.org:8080/3833 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5afd9f7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5afd9f7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5afd9f7d Branch: refs/heads/master Commit: 5afd9f7df765006c067ef5f57d7f7431fe9e1247 Parents: 286da59 Author: Tim Armstrong <[email protected]> Authored: Tue Aug 2 11:02:02 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Thu Aug 11 08:42:41 2016 +0000 ---------------------------------------------------------------------- be/src/exec/base-sequence-scanner.cc | 5 +- be/src/exec/hdfs-parquet-scanner.cc | 28 ++- be/src/exec/parquet-column-readers.cc | 36 +++- be/src/exec/parquet-column-readers.h | 4 +- be/src/exec/parquet-metadata-utils.cc | 47 +++-- be/src/exec/parquet-metadata-utils.h | 9 + be/src/runtime/disk-io-mgr.cc | 11 +- be/src/runtime/scoped-buffer.h | 68 +++++++ be/src/util/bit-stream-utils.h | 9 +- be/src/util/bit-stream-utils.inline.h | 7 +- be/src/util/compress.cc | 5 + be/src/util/dict-encoding.h | 15 +- be/src/util/dict-test.cc | 3 +- be/src/util/rle-encoding.h | 10 +- be/src/util/rle-test.cc | 2 +- .../queries/QueryTest/parquet.test | 2 +- tests/common/impala_test_suite.py | 10 + tests/query_test/test_scanners.py | 9 - tests/query_test/test_scanners_fuzz.py | 203 +++++++++++++++++++ 19 files changed, 427 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 5ac7954..5429e04 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -200,7 +200,10 @@ Status BaseSequenceScanner::ReadSync() { uint8_t* hash; int64_t out_len; RETURN_IF_FALSE(stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_)); - if (out_len != SYNC_HASH_SIZE || memcmp(hash, header_->sync, SYNC_HASH_SIZE)) { + if (out_len != SYNC_HASH_SIZE) { + return Status(Substitute("Hit end of stream after reading $0 bytes of $1-byte " + "synchronization marker", out_len, SYNC_HASH_SIZE)); + } else if (memcmp(hash, header_->sync, SYNC_HASH_SIZE) != 0) { stringstream ss; ss << "Bad synchronization marker" << endl << " Expected: '" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 5582267..b0fd008 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -35,6 +35,7 @@ #include "runtime/mem-pool.h" #include "runtime/row-batch.h" #include "runtime/runtime-filter.inline.h" +#include "runtime/scoped-buffer.h" #include "runtime/tuple-row.h" #include "runtime/tuple.h" #include "runtime/string-value.h" @@ -78,6 +79,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node, // Compute the offset of the file footer. int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length); int64_t footer_start = files[i]->file_length - footer_size; + DCHECK_GE(footer_start, 0); // Try to find the split with the footer. DiskIoMgr::ScanRange* footer_split = FindFooterSplit(files[i]); @@ -311,6 +313,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) { return Status::OK(); } assemble_rows_timer_.Start(); + DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows); int rows_remaining = file_metadata_.num_rows - row_group_rows_read_; int max_tuples = min(row_batch->capacity(), rows_remaining); TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); @@ -496,7 +499,14 @@ Status HdfsParquetScanner::AssembleRows( return Status::OK(); } // Check that all column readers populated the same number of values. - if (c != 0) DCHECK_EQ(last_num_tuples, scratch_batch_->num_tuples); + if (c != 0 && UNLIKELY(last_num_tuples != scratch_batch_->num_tuples)) { + parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' " + "had $2 remaining values but expected $3", filename(), + col_reader->schema_element().name, last_num_tuples, + scratch_batch_->num_tuples)); + *skip_row_group = true; + return Status::OK(); + } last_num_tuples = scratch_batch_->num_tuples; } row_group_rows_read_ += scratch_batch_->num_tuples; @@ -788,7 +798,7 @@ Status HdfsParquetScanner::ProcessFooter() { uint8_t* metadata_ptr = metadata_size_ptr - metadata_size; // If the metadata was too big, we need to stitch it before deserializing it. // In that case, we stitch the data in this buffer. - vector<uint8_t> metadata_buffer; + ScopedBuffer metadata_buffer(scan_node_->mem_tracker()); DCHECK(metadata_range_ != NULL); if (UNLIKELY(metadata_size > remaining_bytes_buffered)) { @@ -803,7 +813,7 @@ Status HdfsParquetScanner::ProcessFooter() { sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - metadata_size; int64_t metadata_bytes_to_read = metadata_size; if (metadata_start < 0) { - return Status(Substitute("File $0 is invalid. Invalid metadata size in file " + return Status(Substitute("File '$0' is invalid. Invalid metadata size in file " "footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size, file_desc->file_length)); } @@ -812,8 +822,12 @@ Status HdfsParquetScanner::ProcessFooter() { // TODO: consider moving this stitching into the scanner context. The scanner // context usually handles the stitching but no other scanner need this logic // now. - metadata_buffer.resize(metadata_size); - metadata_ptr = &metadata_buffer[0]; + + if (!metadata_buffer.TryAllocate(metadata_size)) { + return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet " + "metadata for file '$1'.", metadata_size, filename())); + } + metadata_ptr = metadata_buffer.buffer(); int64_t copy_offset = 0; DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr(); @@ -856,6 +870,10 @@ Status HdfsParquetScanner::ProcessFooter() { return Status( Substitute("Invalid file. This file: $0 has no row groups", filename())); } + if (file_metadata_.num_rows < 0) { + return Status(Substitute("Corrupt Parquet file '$0': negative row count $1 in " + "file metadata", filename(), file_metadata_.num_rows)); + } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 3e8f33c..c7e3e17 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -62,6 +62,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to al Status ParquetLevelDecoder::Init(const string& filename, parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values, uint8_t** data, int* data_size) { + DCHECK_GE(num_buffered_values, 0); encoding_ = encoding; max_level_ = max_level; num_buffered_values_ = num_buffered_values; @@ -95,7 +96,10 @@ Status ParquetLevelDecoder::Init(const string& filename, return Status(ss.str()); } } - DCHECK_GT(num_bytes, 0); + if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) { + return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded levels but " + "only $2 bytes left in page", filename, num_bytes, data_size)); + } *data += num_bytes; *data_size -= num_bytes; return Status::OK(); @@ -404,6 +408,8 @@ class ScalarColumnReader : public BaseScalarColumnReader { } virtual Status InitDataPage(uint8_t* data, int size) { + // Data can be empty if the column contains all NULLs + DCHECK_GE(size, 0); page_encoding_ = current_page_header_.data_page_header.encoding; if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY && page_encoding_ != parquet::Encoding::PLAIN) { @@ -419,7 +425,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { if (!dict_decoder_init_) { return Status("File corrupt. Missing dictionary page."); } - dict_decoder_.SetData(data, size); + RETURN_IF_ERROR(dict_decoder_.SetData(data, size)); } // TODO: Perform filter selectivity checks here. @@ -757,6 +763,15 @@ Status BaseScalarColumnReader::ReadDataPage() { int data_size = current_page_header_.compressed_page_size; int uncompressed_size = current_page_header_.uncompressed_page_size; + if (UNLIKELY(data_size < 0)) { + return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for " + "column '$2'", filename(), data_size, schema_element().name)); + } + if (UNLIKELY(uncompressed_size < 0)) { + return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page " + "size $1 for column '$2'", filename(), uncompressed_size, + schema_element().name)); + } if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) { if (slot_desc_ == NULL) { @@ -853,7 +868,12 @@ Status BaseScalarColumnReader::ReadDataPage() { // statistics. See IMPALA-2208 and PARQUET-251. if (!stream_->ReadBytes(data_size, &data_, &status)) return status; data_end_ = data_ + data_size; - num_buffered_values_ = current_page_header_.data_page_header.num_values; + int num_values = current_page_header_.data_page_header.num_values; + if (num_values < 0) { + return Status(Substitute("Error reading data page in Parquet file '$0'. " + "Invalid number of values in metadata: $1", filename(), num_values)); + } + num_buffered_values_ = num_values; num_values_read_ += num_buffered_values_; if (decompressor_.get() != NULL) { @@ -902,7 +922,7 @@ Status BaseScalarColumnReader::ReadDataPage() { max_def_level(), num_buffered_values_, &data_, &data_size)); // Data can be empty if the column contains all NULLs - if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size)); + RETURN_IF_ERROR(InitDataPage(data_, data_size)); break; } @@ -920,6 +940,14 @@ bool BaseScalarColumnReader::NextLevels() { // Definition level is not present if column and any containing structs are required. def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel(); + // The compiler can optimize these two conditions into a single branch by treating + // def_level_ as unsigned. + if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) { + parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': " + "invalid def level $1 > max def level $2 for column '$3'", filename(), + def_level_, max_def_level(), schema_element().name))); + return false; + } if (ADVANCE_REP_LEVEL && max_rep_level() > 0) { // Repetition level is only present if this column is nested in any collection type. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 930e6bb..8435e71 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -428,7 +428,9 @@ class BaseScalarColumnReader : public ParquetColumnReader { virtual void ClearDictionaryDecoder() = 0; /// Initializes the reader with the data contents. This is the content for the entire - /// decompressed data page. Decoders can initialize state from here. + /// decompressed data page. Decoders can initialize state from here. The caller must + /// validate the input such that 'size' is non-negative and that 'data' has at least + /// 'size' bytes remaining. virtual Status InitDataPage(uint8_t* data, int size) = 0; private: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-metadata-utils.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc index 52ae933..1b694ed 100644 --- a/be/src/exec/parquet-metadata-utils.cc +++ b/be/src/exec/parquet-metadata-utils.cc @@ -56,32 +56,41 @@ Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename, int64_t file_length, const parquet::RowGroup& row_group) { for (int i = 0; i < row_group.columns.size(); ++i) { const parquet::ColumnChunk& col_chunk = row_group.columns[i]; + RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length, + col_chunk.meta_data.data_page_offset, "data page offset")); int64_t col_start = col_chunk.meta_data.data_page_offset; // The file format requires that if a dictionary page exists, it be before data pages. if (col_chunk.meta_data.__isset.dictionary_page_offset) { + RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length, + col_chunk.meta_data.dictionary_page_offset, "dictionary page offset")); if (col_chunk.meta_data.dictionary_page_offset >= col_start) { - stringstream ss; - ss << "File " << filename << ": metadata is corrupt. " - << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset - << ") must come before any data pages (offset=" << col_start << ")."; - return Status(ss.str()); + return Status(Substitute("Parquet file '$0': metadata is corrupt. Dictionary " + "page (offset=$1) must come before any data pages (offset=$2).", + filename, col_chunk.meta_data.dictionary_page_offset, col_start)); } col_start = col_chunk.meta_data.dictionary_page_offset; } int64_t col_len = col_chunk.meta_data.total_compressed_size; int64_t col_end = col_start + col_len; if (col_end <= 0 || col_end > file_length) { - stringstream ss; - ss << "File " << filename << ": metadata is corrupt. " - << "Column " << i << " has invalid column offsets " - << "(offset=" << col_start << ", size=" << col_len << ", " - << "file_size=" << file_length << ")."; - return Status(ss.str()); + return Status(Substitute("Parquet file '$0': metadata is corrupt. Column $1 has " + "invalid column offsets (offset=$2, size=$3, file_size=$4).", filename, i, + col_start, col_len, file_length)); } } return Status::OK(); } +Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int col_idx, + int64_t file_length, int64_t offset, const string& offset_name) { + if (offset < 0 || offset >= file_length) { + return Status(Substitute("File '$0': metadata is corrupt. Column $1 has invalid " + "$2 (offset=$3 file_size=$4).", filename, col_idx, offset_name, offset, + file_length)); + } + return Status::OK();; +} + static bool IsEncodingSupported(parquet::Encoding::type e) { switch (e) { case parquet::Encoding::PLAIN: @@ -128,8 +137,10 @@ Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_me if (slot_desc == NULL) return Status::OK(); parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type]; - DCHECK_EQ(type, file_data.meta_data.type) - << "Should have been validated in ResolvePath()"; + if (UNLIKELY(type != file_data.meta_data.type)) { + return Status(Substitute("Unexpected Parquet type in file '$0' metadata expected $1 " + "actual $2: file may be corrupt", filename, type, file_data.meta_data.type)); + } // Check the decimal scale in the file matches the metastore scale and precision. // We fail the query if the metadata makes it impossible for us to safely read @@ -318,7 +329,7 @@ Status ParquetSchemaResolver::CreateSchemaTree( int ira_def_level, int* idx, int* col_idx, SchemaNode* node) const { if (*idx >= schema.size()) { - return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from " + return Status(Substitute("File '$0' corrupt: could not reconstruct schema tree from " "flattened schema in file metadata", filename_)); } node->element = &schema[*idx]; @@ -329,6 +340,14 @@ Status ParquetSchemaResolver::CreateSchemaTree( // file_metadata_.row_groups.columns node->col_idx = *col_idx; ++(*col_idx); + } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) { + // Sanity-check the schema to avoid allocating absurdly large buffers below. + return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than limit of " + "$2. File is likely corrupt", filename_, node->element->num_children, + SCHEMA_NODE_CHILDREN_SANITY_LIMIT)); + } else if (node->element->num_children < 0) { + return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.", + filename_, node->element->num_children)); } // def_level_of_immediate_repeated_ancestor does not include this node, so set before http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-metadata-utils.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h index 7e1db31..7a1e897 100644 --- a/be/src/exec/parquet-metadata-utils.h +++ b/be/src/exec/parquet-metadata-utils.h @@ -39,6 +39,11 @@ class ParquetMetadataUtils { static Status ValidateColumnOffsets(const string& filename, int64_t file_length, const parquet::RowGroup& row_group); + /// Check that a file offset is in the file. Return an error status with a detailed + /// error message if it is not. + static Status ValidateOffsetInFile(const std::string& filename, int col_idx, + int64_t file_length, int64_t offset, const std::string& offset_name); + /// Validates the column metadata to make sure this column is supported (e.g. encoding, /// type, etc) and matches the type of given slot_desc. static Status ValidateColumn(const parquet::FileMetaData& file_metadata, @@ -144,6 +149,10 @@ class ParquetSchemaResolver { bool* missing_field) const; private: + /// An arbitrary limit on the number of children per schema node we support. + /// Used to sanity-check Parquet schemas. + static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024; + /// Unflattens the schema metadata from a Parquet file metadata and converts it to our /// SchemaNode representation. Returns the result in 'node' unless an error status is /// returned. Does not set the slot_desc field of any SchemaNode. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 88ea035..5df69ed 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -519,10 +519,13 @@ int64_t DiskIoMgr::GetReadThroughput() { Status DiskIoMgr::ValidateScanRange(ScanRange* range) { int disk_id = range->disk_id_; if (disk_id < 0 || disk_id >= disk_queues_.size()) { - stringstream ss; - ss << "Invalid scan range. Bad disk id: " << disk_id; - DCHECK(false) << ss.str(); - return Status(ss.str()); + return Status(Substitute("Invalid scan range. Bad disk id: $0", disk_id)); + } + if (range->offset_ < 0) { + return Status(Substitute("Invalid scan range. Negative offset $0", range->offset_)); + } + if (range->len_ < 0) { + return Status(Substitute("Invalid scan range. Negative length $0", range->len_)); } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/runtime/scoped-buffer.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/scoped-buffer.h b/be/src/runtime/scoped-buffer.h new file mode 100644 index 0000000..4841f7f --- /dev/null +++ b/be/src/runtime/scoped-buffer.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_SCOPED_BUFFER_H +#define IMPALA_RUNTIME_SCOPED_BUFFER_H + +#include "runtime/mem-tracker.h" + +namespace { + +/// A scoped memory allocation that is tracked against a MemTracker. +/// The allocation is automatically freed when the ScopedBuffer object goes out of scope. +class ScopedBuffer { + public: + ScopedBuffer(MemTracker* mem_tracker) : mem_tracker_(mem_tracker), + buffer_(NULL), bytes_(0) {} + ~ScopedBuffer() { Release(); } + + /// Try to allocate a buffer of size 'bytes'. Returns false if MemTracker::TryConsume() + /// or malloc() fails. + /// Should not be called if a buffer is already allocated. + bool TryAllocate(int64_t bytes) { + DCHECK(buffer_ == NULL); + DCHECK_GT(bytes, 0); + if (!mem_tracker_->TryConsume(bytes)) return false; + buffer_ = reinterpret_cast<uint8_t*>(malloc(bytes)); + if (UNLIKELY(buffer_ == NULL)) { + mem_tracker_->Release(bytes); + return false; + } + bytes_ = bytes; + return true; + } + + void Release() { + if (buffer_ == NULL) return; + free(buffer_); + buffer_ = NULL; + mem_tracker_->Release(bytes_); + bytes_ = 0; + } + + inline uint8_t* buffer() const { return buffer_; } + + private: + MemTracker* mem_tracker_; + uint8_t* buffer_; + /// The current size of the allocated buffer, if not NULL. + int64_t bytes_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/bit-stream-utils.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h index 48ced18..ce159cb 100644 --- a/be/src/util/bit-stream-utils.h +++ b/be/src/util/bit-stream-utils.h @@ -77,6 +77,9 @@ class BitWriter { /// to the next byte boundary. void Flush(bool align=false); + /// Maximum supported bitwidth for writer. + static const int MAX_BITWIDTH = 32; + private: uint8_t* buffer_; int max_bytes_; @@ -123,7 +126,8 @@ class BitReader { bool GetAligned(int num_bytes, T* v); /// Reads a vlq encoded int from the stream. The encoded int must start at the - /// beginning of a byte. Return false if there were not enough bytes in the buffer. + /// beginning of a byte. Return false if there were not enough bytes in the buffer or + /// the int is invalid. bool GetVlqInt(int32_t* v); /// Returns the number of bytes left in the stream, not including the current byte (i.e., @@ -133,6 +137,9 @@ class BitReader { /// Maximum byte length of a vlq encoded int static const int MAX_VLQ_BYTE_LEN = 5; + /// Maximum supported bitwidth for reader. + static const int MAX_BITWIDTH = 32; + private: uint8_t* buffer_; int max_bytes_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/bit-stream-utils.inline.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h index 4249bc5..fd77974 100644 --- a/be/src/util/bit-stream-utils.inline.h +++ b/be/src/util/bit-stream-utils.inline.h @@ -25,7 +25,7 @@ namespace impala { inline bool BitWriter::PutValue(uint64_t v, int num_bits) { // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases) - DCHECK_LE(num_bits, 32); + DCHECK_LE(num_bits, MAX_BITWIDTH); DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false; @@ -88,7 +88,7 @@ template<typename T> inline bool BitReader::GetValue(int num_bits, T* v) { DCHECK(buffer_ != NULL); // TODO: revisit this limit if necessary - DCHECK_LE(num_bits, 32); + DCHECK_LE(num_bits, MAX_BITWIDTH); DCHECK_LE(num_bits, sizeof(T) * 8); if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false; @@ -140,13 +140,12 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { inline bool BitReader::GetVlqInt(int32_t* v) { *v = 0; int shift = 0; - int num_bytes = 0; uint8_t byte = 0; do { + if (UNLIKELY(shift >= MAX_VLQ_BYTE_LEN * 7)) return false; if (!GetAligned<uint8_t>(1, &byte)) return false; *v |= (byte & 0x7F) << shift; shift += 7; - DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN); } while ((byte & 0x80) != 0); return true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/compress.cc ---------------------------------------------------------------------- diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index 1676a50..7c97737 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -114,6 +114,7 @@ Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input, Status GzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t* output_length, uint8_t** output) { + DCHECK_GE(input_length, 0); DCHECK(!output_preallocated || (output_preallocated && *output_length > 0)); int64_t max_compressed_len = MaxOutputLen(input_length); if (!output_preallocated) { @@ -146,6 +147,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng // The bz2 library does not allow input to be NULL, even when input_length is 0. This // should be OK because we do not write any file formats that support bzip compression. DCHECK(input != NULL); + DCHECK_GE(input_length, 0); if (output_preallocated) { buffer_length_ = *output_length; @@ -201,6 +203,7 @@ int64_t SnappyBlockCompressor::MaxOutputLen(int64_t input_len, const uint8_t* in Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t *output_length, uint8_t** output) { + DCHECK_GE(input_length, 0); // Hadoop uses a block compression scheme on top of snappy. First there is // an integer which is the size of the decompressed data followed by a // sequence of compressed blocks each preceded with an integer size. @@ -252,6 +255,7 @@ int64_t SnappyCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t* output_length, uint8_t** output) { + DCHECK_GE(input_length, 0); int64_t max_compressed_len = MaxOutputLen(input_length); if (output_preallocated && *output_length < max_compressed_len) { return Status("SnappyCompressor::ProcessBlock: output length too small"); @@ -292,6 +296,7 @@ int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t* input) { Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t* output_length, uint8_t** output) { + DCHECK_GE(input_length, 0); CHECK(output_preallocated) << "Output was not allocated for Lz4 Codec"; if (input_length == 0) return Status::OK(); *output_length = LZ4_compress(reinterpret_cast<const char*>(input), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/dict-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h index 09a3d2d..d9fbe08 100644 --- a/be/src/util/dict-encoding.h +++ b/be/src/util/dict-encoding.h @@ -23,6 +23,7 @@ #include <boost/unordered_map.hpp> #include "gutil/bits.h" +#include "gutil/strings/substitute.h" #include "exec/parquet-common.h" #include "runtime/mem-pool.h" #include "runtime/string-value.h" @@ -166,14 +167,20 @@ class DictEncoder : public DictEncoderBase { /// by the caller and valid as long as this object is. class DictDecoderBase { public: - /// The rle encoded indices into the dictionary. - void SetData(uint8_t* buffer, int buffer_len) { - DCHECK_GT(buffer_len, 0); + /// The rle encoded indices into the dictionary. Returns an error status if the buffer + /// is too short or the bit_width metadata in the buffer is invalid. + Status SetData(uint8_t* buffer, int buffer_len) { + DCHECK_GE(buffer_len, 0); + if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes"); uint8_t bit_width = *buffer; - DCHECK_GE(bit_width, 0); + if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) { + return Status(strings::Substitute("Dictionary has invalid or unsupported bit " + "width: $0", bit_width)); + } ++buffer; --buffer_len; data_decoder_.Reset(buffer, buffer_len, bit_width); + return Status::OK(); } virtual ~DictDecoderBase() {} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/dict-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc index ea6536c..10d5e3d 100644 --- a/be/src/util/dict-test.cc +++ b/be/src/util/dict-test.cc @@ -25,6 +25,7 @@ #include "runtime/mem-tracker.h" #include "runtime/string-value.inline.h" #include "runtime/timestamp-value.h" +#include "testutil/test-macros.h" #include "util/dict-encoding.h" #include "common/names.h" @@ -53,7 +54,7 @@ void ValidateDict(const vector<T>& values, int fixed_buffer_byte_size) { DictDecoder<T> decoder; ASSERT_TRUE( decoder.Reset(dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size)); - decoder.SetData(data_buffer, data_len); + ASSERT_OK(decoder.SetData(data_buffer, data_len)); for (T i: values) { T j; decoder.GetValue(&j); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/rle-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h index 6f993d5..9f39697 100644 --- a/be/src/util/rle-encoding.h +++ b/be/src/util/rle-encoding.h @@ -88,14 +88,14 @@ class RleDecoder { repeat_count_(0), literal_count_(0) { DCHECK_GE(bit_width_, 0); - DCHECK_LE(bit_width_, 64); + DCHECK_LE(bit_width_, BitReader::MAX_BITWIDTH); } RleDecoder() : bit_width_(-1) {} void Reset(uint8_t* buffer, int buffer_len, int bit_width) { DCHECK_GE(bit_width, 0); - DCHECK_LE(bit_width, 64); + DCHECK_LE(bit_width, BitReader::MAX_BITWIDTH); bit_reader_.Reset(buffer, buffer_len); bit_width_ = bit_width; current_value_ = 0; @@ -262,8 +262,7 @@ inline bool RleDecoder::Get(T* val) { --repeat_count_; } else { DCHECK_GT(literal_count_, 0); - bool result = bit_reader_.GetValue(bit_width_, val); - DCHECK(result); + if (UNLIKELY(!bit_reader_.GetValue(bit_width_, val))) return false; --literal_count_; } @@ -275,8 +274,7 @@ bool RleDecoder::NextCounts() { // Read the next run's indicator int, it could be a literal or repeated run. // The int is encoded as a vlq-encoded value. int32_t indicator_value = 0; - bool result = bit_reader_.GetVlqInt(&indicator_value); - if (!result) return false; + if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return false; // lsb indicates if it is a literal run or repeated run bool is_literal = indicator_value & 1; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/rle-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc index b3dc5b7..fd429eb 100644 --- a/be/src/util/rle-test.cc +++ b/be/src/util/rle-test.cc @@ -31,7 +31,7 @@ namespace impala { -const int MAX_WIDTH = 32; +const int MAX_WIDTH = BitReader::MAX_BITWIDTH; TEST(BitArray, TestBool) { const int len = 8; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/testdata/workloads/functional-query/queries/QueryTest/parquet.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test index e6b4061..a449162 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test @@ -28,7 +28,7 @@ Invalid metadata size in file footer # Parquet file with invalid column dict_page_offset. SELECT * from bad_dict_page_offset ---- CATCH -Column 0 has invalid column offsets (offset=10000, size=47, file_size=249) +Column 0 has invalid data page offset (offset=100001 file_size=249) ==== ---- QUERY # Parquet file with invalid column total_compressed_size. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/common/impala_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 8863995..4fcce64 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -512,6 +512,16 @@ class ImpalaTestSuite(BaseTestSuite): self.hive_client.drop_table(db_name, table_name, True) self.hive_client.create_table(table) + def _get_table_location(self, table_name, vector): + """ Returns the HDFS location of the table """ + result = self.execute_query_using_client(self.client, + "describe formatted %s" % table_name, vector) + for row in result.data: + if 'Location:' in row: + return row.split('\t')[1] + # This should never happen. + assert 0, 'Unable to get location for table: ' + table_name + def run_stmt_in_hive(self, stmt): """ Run a statement in Hive, returning stdout if successful and throwing http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 8e78670..4cfbcb0 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -117,15 +117,6 @@ class TestUnmatchedSchema(ImpalaTestSuite): cls.TestMatrix.add_constraint(\ lambda v: v.get_value('table_format').file_format != 'avro') - def _get_table_location(self, table_name, vector): - result = self.execute_query_using_client(self.client, - "describe formatted %s" % table_name, vector) - for row in result.data: - if 'Location:' in row: - return row.split('\t')[1] - # This should never happen. - assert 0, 'Unable to get location for table: ' + table_name - def _create_test_table(self, vector): """ Creates the test table http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/query_test/test_scanners_fuzz.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py new file mode 100644 index 0000000..ae17572 --- /dev/null +++ b/tests/query_test/test_scanners_fuzz.py @@ -0,0 +1,203 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import pytest +import random +import shutil +import tempfile +import time +from subprocess import check_call +from tests.common.impala_test_suite import ImpalaTestSuite, LOG +from tests.util.filesystem_utils import WAREHOUSE, get_fs_path + +# Random fuzz testing of HDFS scanners. Existing tables for any HDFS file format +# are corrupted in random ways to flush out bugs with handling of corrupted data. +class TestScannersFuzzing(ImpalaTestSuite): + # Test a range of batch sizes to exercise different corner cases. + BATCH_SIZES = [0, 1, 16, 10000] + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestScannersFuzzing, cls).add_test_dimensions() + # TODO: enable for more table formats once they consistently pass the fuzz test. + cls.TestMatrix.add_constraint(lambda v:\ + v.get_value('table_format').file_format in ('avro', 'parquet') or + (v.get_value('table_format').file_format == 'text' + and v.get_value('table_format').compression_type == 'none')) + + def test_fuzz_alltypes(self, vector, unique_database): + self.run_fuzz_test(vector, unique_database, "alltypes") + + def test_fuzz_decimal_tbl(self, vector, unique_database): + table_format = vector.get_value('table_format') + table_name = "decimal_tbl" + if table_format.file_format in ('avro'): + table_name = "avro_decimal_tbl" + if table_format.compression_codec != 'block' or \ + table_format.compression_type != 'snap': + pytest.skip() + + self.run_fuzz_test(vector, unique_database, table_name, 10) + + def test_fuzz_nested_types(self, vector, unique_database): + table_format = vector.get_value('table_format') + if table_format.file_format != 'parquet': pytest.skip() + self.run_fuzz_test(vector, unique_database, "complextypestbl", 10) + + # TODO: add test coverage for additional data types like char and varchar + + def run_fuzz_test(self, vector, unique_database, table, num_copies=1): + """ Do some basic fuzz testing: create a copy of an existing table with randomly + corrupted files and make sure that we don't crash or behave in an unexpected way. + 'unique_database' is used for the table, so it will be cleaned up automatically. + If 'num_copies' is set, create that many corrupted copies of each input file. + SCANNER_FUZZ_SEED can be set in the environment to reproduce the result (assuming that + input files are the same). + SCANNER_FUZZ_KEEP_FILES can be set in the environment to keep the generated files. + """ + # Create and seed a new random number generator for reproducibility. + rng = random.Random() + random_seed = os.environ.get("SCANNER_FUZZ_SEED") or time.time() + LOG.info("Using random seed %d", random_seed) + rng.seed(long(random_seed)) + + table_format = vector.get_value('table_format') + self.change_database(self.client, table_format) + + tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % table, + dir=os.path.join(os.environ['IMPALA_HOME'], "testdata")) + + self.execute_query("create table %s.%s like %s" % (unique_database, table, table)) + fuzz_table_location = get_fs_path("/test-warehouse/{0}.db/{1}".format( + unique_database, table)) + + LOG.info("Generating corrupted version of %s in %s. Local working directory is %s", + table, unique_database, tmp_table_dir) + + # Find the location of the existing table and get the full table directory structure. + table_loc = self._get_table_location(table, vector) + check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir]) + + partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng) + for partition in partitions: + self.execute_query('alter table {0}.{1} add partition ({2})'.format( + unique_database, table, ','.join(partition))) + + # Copy all of the local files and directories to hdfs. + to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir) + for file_or_dir in os.listdir(tmp_table_dir)] + check_call(['hdfs', 'dfs', '-copyFromLocal'] + to_copy + [fuzz_table_location]) + + if "SCANNER_FUZZ_KEEP_FILES" not in os.environ: + shutil.rmtree(tmp_table_dir) + + # Querying the corrupted files should not DCHECK or crash. + self.execute_query("refresh %s.%s" % (unique_database, table)) + # Execute a query that tries to read all the columns and rows in the file. + # Also execute a count(*) that materializes no columns, since different code + # paths are exercised. + # Use abort_on_error=0 to ensure we scan all the files. + queries = [ + 'select count(*) from (select distinct * from {0}.{1}) q'.format( + unique_database, table), + 'select count(*) from {0}.{1} q'.format(unique_database, table)] + + xfail_msgs = [] + for query in queries: + for batch_size in self.BATCH_SIZES: + query_options = {'abort_on_error': '0', 'batch_size': batch_size} + try: + result = self.execute_query(query, query_options = query_options) + LOG.info('\n'.join(result.log)) + except Exception as e: + if 'memory limit exceeded' in str(e).lower(): + # Memory limit error should fail query. + continue + msg = "Should not throw error when abort_on_error=0: '{0}'".format(e) + LOG.error(msg) + # Parquet fails the query for some parse errors. + if table_format.file_format == 'parquet': + xfail_msgs.append(msg) + else: + raise + if len(xfail_msgs) != 0: + pytest.xfail('\n'.join(xfail_msgs)) + + def walk_and_corrupt_table_data(self, tmp_table_dir, num_copies, rng): + """ Walks a local copy of a HDFS table directory. Returns a list of partitions, each + as a list of "key=val" pairs. Ensures there is 'num_copies' copies of each file, + and corrupts each of the copies. + """ + partitions = [] + # Iterate over the partitions and files we downloaded. + for subdir, dirs, files in os.walk(tmp_table_dir): + if '_impala_insert_staging' in subdir: continue + if len(dirs) != 0: continue # Skip non-leaf directories + + rel_subdir = os.path.relpath(subdir, tmp_table_dir) + if rel_subdir != ".": + # Create metadata for any directory partitions. + partitions.append(self.partitions_from_path(rel_subdir)) + + # Corrupt all of the files that we find. + for filename in files: + filepath = os.path.join(subdir, filename) + copies = [filepath] + for copy_num in range(1, num_copies): + copypath = os.path.join(subdir, "copy{0}_{1}".format(copy_num, filename)) + shutil.copyfile(filepath, copypath) + copies.append(copypath) + for filepath in copies: + self.corrupt_file(filepath, rng) + return partitions + + def partitions_from_path(self, relpath): + """ Return a list of "key=val" parts from partitions inferred from the directory path. + """ + reversed_partitions = [] + while relpath != '': + relpath, suffix = os.path.split(relpath) + reversed_partitions.append(suffix) + return reversed(reversed_partitions) + + def corrupt_file(self, path, rng): + """ Corrupt the file at 'path' in the local file system in a randomised way using the + random number generator 'rng'. Rewrites the file in-place. + Logs a message to describe how the file was corrupted, so the error is reproducible. + """ + with open(path, "rb") as f: + data = bytearray(f.read()) + + if rng.random() < 0.5: + flip_offset = rng.randint(0, len(data) - 1) + flip_val = rng.randint(0, 255) + LOG.info("corrupt_file: Flip byte in %s at %d from %d to %d", path, flip_offset, + data[flip_offset], flip_val) + data[flip_offset] = flip_val + else: + truncation = rng.randint(0, len(data)) + LOG.info("corrupt_file: Truncate %s to %d", path, truncation) + data = data[:truncation] + + with open(path, "wb") as f: + f.write(data) +
