Repository: incubator-impala Updated Branches: refs/heads/master a99114283 -> 24a6e53d1
IMPALA-5412: Fix scan result with partitions on same file The maps storing file descriptors and file metadata were using filename as a key. Multiple partitions pointing to the same filesystem location resulted that these map entries were occasionally overwritted by the other partition poing to the same. As a solution the map key was enhanced to contain a pair of partition ID and file name. Change-Id: Ie74b305377248045c0d87b911943e1cabb7223e9 Reviewed-on: http://gerrit.cloudera.org:8080/7625 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/24a6e53d Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/24a6e53d Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/24a6e53d Branch: refs/heads/master Commit: 24a6e53d141cfcea158607942983e62ef5172ca6 Parents: a991142 Author: Gabor Kaszab <[email protected]> Authored: Mon Aug 7 18:27:27 2017 +0200 Committer: Tim Armstrong <[email protected]> Committed: Mon Aug 14 03:35:21 2017 +0000 ---------------------------------------------------------------------- be/src/exec/base-sequence-scanner.cc | 11 ++++-- be/src/exec/hdfs-parquet-scanner.cc | 13 ++++--- be/src/exec/hdfs-scan-node-base.cc | 28 ++++++++------ be/src/exec/hdfs-scan-node-base.h | 36 ++++++++++-------- be/src/exec/hdfs-text-scanner.cc | 3 +- be/src/exec/scanner-context.cc | 5 ++- be/src/util/container-util.h | 10 +++++ tests/metadata/test_partition_metadata.py | 51 +++++++++++++++++++++----- 8 files changed, 110 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 25fed0b..a522a1f 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -86,7 +86,8 @@ Status BaseSequenceScanner::Open(ScannerContext* context) { scan_node_->runtime_profile(), "BytesSkipped", TUnit::BYTES); header_ = reinterpret_cast<FileHeader*>( - scan_node_->GetFileMetadata(stream_->filename())); + scan_node_->GetFileMetadata( + context->partition_descriptor()->id(), stream_->filename())); if (header_ == nullptr) { only_parsing_header_ = true; return Status::OK(); @@ -157,8 +158,9 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) { } // Header is parsed, set the metadata in the scan node and issue more ranges. static_cast<HdfsScanNodeBase*>(scan_node_)->SetFileMetadata( - stream_->filename(), header_); - HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename()); + context_->partition_descriptor()->id(), stream_->filename(), header_); + HdfsFileDesc* desc = scan_node_->GetFileDesc( + context_->partition_descriptor()->id(), stream_->filename()); RETURN_IF_ERROR(scan_node_->AddDiskIoRanges(desc)); return Status::OK(); } @@ -303,7 +305,8 @@ Status BaseSequenceScanner::SkipToSync(const uint8_t* sync, int sync_size) { void BaseSequenceScanner::CloseFileRanges(const char* filename) { DCHECK(only_parsing_header_); - HdfsFileDesc* desc = scan_node_->GetFileDesc(filename); + HdfsFileDesc* desc = scan_node_->GetFileDesc( + context_->partition_descriptor()->id(), filename); const vector<DiskIoMgr::ScanRange*>& splits = desc->splits; for (int i = 0; i < splits.size(); ++i) { COUNTER_ADD(bytes_skipped_counter_, splits[i]->len()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index e97572c..ecf14f8 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -609,7 +609,8 @@ Status HdfsParquetScanner::NextRowGroup() { int64_t split_offset = split_range->offset(); int64_t split_length = split_range->len(); - HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename()); + HdfsFileDesc* file_desc = scan_node_->GetFileDesc( + context_->partition_descriptor()->id(), filename()); bool start_with_first_row_group = row_group_idx_ == -1; bool misaligned_row_group_skipped = false; @@ -1361,7 +1362,8 @@ Status HdfsParquetScanner::ProcessFooter() { // In this case, the metadata is bigger than our guess meaning there are // not enough bytes in the footer range from IssueInitialRanges(). // We'll just issue more ranges to the IoMgr that is the actual footer. - const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename()); + int64_t partition_id = context_->partition_descriptor()->id(); + const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); DCHECK(file_desc != NULL); // The start of the metadata is: // file_length - 4-byte metadata size - footer-size - metadata size @@ -1383,7 +1385,7 @@ Status HdfsParquetScanner::ProcessFooter() { // Read the header into the metadata buffer. DiskIoMgr::ScanRange* metadata_range = scan_node_->AllocateScanRange( - metadata_range_->fs(), filename(), metadata_size, metadata_start, -1, + metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id, metadata_range_->disk_id(), metadata_range_->expected_local(), DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size)); @@ -1585,7 +1587,8 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path, Status HdfsParquetScanner::InitColumns( int row_group_idx, const vector<ParquetColumnReader*>& column_readers) { - const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename()); + int64_t partition_id = context_->partition_descriptor()->id(); + const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); DCHECK(file_desc != NULL); parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx]; @@ -1665,7 +1668,7 @@ Status HdfsParquetScanner::InitColumns( && col_start >= split_range->offset() && col_end <= split_range->offset() + split_range->len(); DiskIoMgr::ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(), - filename(), col_len, col_start, scalar_reader->col_idx(), split_range->disk_id(), + filename(), col_len, col_start, partition_id, split_range->disk_id(), col_range_local, DiskIoMgr::BufferOpts(split_range->try_cache(), file_desc->mtime)); col_ranges.push_back(col_range); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 0e5e974..ca71201 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -263,12 +263,13 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { file_path.append(split.file_name, filesystem::path::codecvt()); const string& native_file_path = file_path.native(); + auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path); HdfsFileDesc* file_desc = NULL; - FileDescMap::iterator file_desc_it = file_descs_.find(native_file_path); + FileDescMap::iterator file_desc_it = file_descs_.find(file_desc_map_key); if (file_desc_it == file_descs_.end()) { // Add new file_desc to file_descs_ and per_type_files_ file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path)); - file_descs_[native_file_path] = file_desc; + file_descs_[file_desc_map_key] = file_desc; file_desc->file_length = split.file_length; file_desc->mtime = split.mtime; file_desc->file_compression = split.file_compression; @@ -603,7 +604,7 @@ DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* // beyond the end of the file). DCHECK_GE(offset, 0); DCHECK_GE(len, 0); - DCHECK_LE(offset + len, GetFileDesc(file)->file_length) + DCHECK_LE(offset + len, GetFileDesc(partition_id, file)->file_length) << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")"; disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local); @@ -630,20 +631,25 @@ Status HdfsScanNodeBase::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ra return Status::OK(); } -HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(const string& filename) { - DCHECK(file_descs_.find(filename) != file_descs_.end()); - return file_descs_[filename]; +HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(int64_t partition_id, const string& filename) { + auto file_desc_map_key = make_pair(partition_id, filename); + DCHECK(file_descs_.find(file_desc_map_key) != file_descs_.end()); + return file_descs_[file_desc_map_key]; } -void HdfsScanNodeBase::SetFileMetadata(const string& filename, void* metadata) { +void HdfsScanNodeBase::SetFileMetadata( + int64_t partition_id, const string& filename, void* metadata) { unique_lock<mutex> l(metadata_lock_); - DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end()); - per_file_metadata_[filename] = metadata; + auto file_metadata_map_key = make_pair(partition_id, filename); + DCHECK(per_file_metadata_.find(file_metadata_map_key) == per_file_metadata_.end()); + per_file_metadata_[file_metadata_map_key] = metadata; } -void* HdfsScanNodeBase::GetFileMetadata(const string& filename) { +void* HdfsScanNodeBase::GetFileMetadata( + int64_t partition_id, const string& filename) { unique_lock<mutex> l(metadata_lock_); - auto it = per_file_metadata_.find(filename); + auto file_metadata_map_key = make_pair(partition_id, filename); + auto it = per_file_metadata_.find(file_metadata_map_key); if (it == per_file_metadata_.end()) return NULL; return it->second; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index c79a6e8..e1c431f 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -34,6 +34,7 @@ #include "util/avro-util.h" #include "util/progress-updater.h" #include "util/spinlock.h" +#include "util/container-util.h" namespace impala { @@ -196,10 +197,10 @@ class HdfsScanNodeBase : public ScanNode { /// Allocate a new scan range object, stored in the runtime state's object pool. For /// scan ranges that correspond to the original hdfs splits, the partition id must be - /// set to the range's partition id. For other ranges (e.g. columns in parquet, read - /// past buffers), the partition_id is unused. expected_local should be true if this - /// scan range is not expected to require a remote read. The range must fall within - /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length. + /// set to the range's partition id. Partition_id is mandatory as it is used to gather + /// file descriptor info. expected_local should be true if this scan range is not + /// expected to require a remote read. The range must fall within the file bounds. + /// That is, the offset must be >= 0, and offset + len <= file_length. /// If not NULL, the 'original_split' pointer is stored for reference in the scan range /// metadata of the scan range that is to be allocated. /// This is thread safe. @@ -233,16 +234,17 @@ class HdfsScanNodeBase : public ScanNode { Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals, MemPool* pool, RuntimeState* state) const; - /// Returns the file desc for 'filename'. Returns nullptr if filename is invalid. - HdfsFileDesc* GetFileDesc(const std::string& filename); + /// Given a partition_id and filename returns the related file descriptor + /// DCHECK ensures there is always file descriptor returned + HdfsFileDesc* GetFileDesc(int64_t partition_id, const std::string& filename); - /// Sets the scanner specific metadata for 'filename'. Scanners can use this to store - /// file header information. Thread safe. - void SetFileMetadata(const std::string& filename, void* metadata); + /// Sets the scanner specific metadata for 'partition_id' and 'filename'. + /// Scanners can use this to store file header information. Thread safe. + void SetFileMetadata(int64_t partition_id, const std::string& filename, void* metadata); - /// Returns the scanner specific metadata for 'filename'. Returns nullptr if there is - /// no metadata. Thread safe. - void* GetFileMetadata(const std::string& filename); + /// Returns the scanner specific metadata for 'partition_id' and 'filename'. + /// Returns nullptr if there is no metadata. Thread safe. + void* GetFileMetadata(int64_t partition_id, const std::string& filename); /// Called by scanners when a range is complete. Used to record progress. /// This *must* only be called after a scanner has completely finished its @@ -356,8 +358,11 @@ class HdfsScanNodeBase : public ScanNode { /// Partitions scanned by this scan node. std::unordered_set<int64_t> partition_ids_; - /// File path => file descriptor (which includes the file's splits) - typedef std::unordered_map<std::string, HdfsFileDesc*> FileDescMap; + /// This is a pair for partition ID and filename + typedef pair<int64_t, std::string> PartitionFileKey; + + /// partition_id, File path => file descriptor (which includes the file's splits) + typedef std::unordered_map<PartitionFileKey, HdfsFileDesc*, pair_hash> FileDescMap; FileDescMap file_descs_; /// File format => file descriptors. @@ -366,9 +371,10 @@ class HdfsScanNodeBase : public ScanNode { FileFormatsMap per_type_files_; /// Scanner specific per file metadata (e.g. header information) and associated lock. + /// Key of the map is partition_id, filename pair /// TODO: Remove this lock when removing the legacy scanners and scan nodes. boost::mutex metadata_lock_; - std::unordered_map<std::string, void*> per_file_metadata_; + std::unordered_map<PartitionFileKey, void*, pair_hash> per_file_metadata_; /// Conjuncts for each materialized tuple (top-level row batch tuples and collection /// item tuples). Includes a copy of ExecNode.conjuncts_. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 378b45e..5962f94 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -585,7 +585,8 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) { // For other compressed text: attempt to read and decompress the entire file, point // to the decompressed buffer, and then continue normal processing. DCHECK(decompression_type_ != THdfsCompression::SNAPPY); - HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename()); + HdfsFileDesc* desc = scan_node_->GetFileDesc( + context_->partition_descriptor()->id(), stream_->filename()); int64_t file_size = desc->file_length; DCHECK_GT(file_size, 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 7a998b3..b1577a1 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -77,7 +77,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent) ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { std::unique_ptr<Stream> stream(new Stream(this)); stream->scan_range_ = range; - stream->file_desc_ = scan_node_->GetFileDesc(stream->filename()); + stream->file_desc_ = scan_node_->GetFileDesc(partition_desc_->id(), stream->filename()); stream->file_len_ = stream->file_desc_->file_length; stream->total_bytes_returned_ = 0; stream->io_buffer_pos_ = NULL; @@ -175,8 +175,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { // TODO: We are leaving io_buffer_ = NULL, revisit. return Status::OK(); } + int64_t partition_id = parent_->partition_descriptor()->id(); DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange( - scan_range_->fs(), filename(), read_past_buffer_size, offset, -1, + scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id, scan_range_->disk_id(), false, DiskIoMgr::BufferOpts::Uncached()); RETURN_IF_ERROR(parent_->state_->io_mgr()->Read( parent_->scan_node_->reader_context(), range, &io_buffer_)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/util/container-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h index 80ec984..5cd2d0c 100644 --- a/be/src/util/container-util.h +++ b/be/src/util/container-util.h @@ -64,6 +64,16 @@ struct TNetworkAddressPtrEquals : public std::unary_function<TNetworkAddress*, b }; +struct pair_hash { + template <class T1, class T2> + std::size_t operator () (const std::pair<T1, T2> &p) const { + size_t seed = 0; + boost::hash_combine(seed, std::hash<T1>{}(p.first)); + boost::hash_combine(seed, std::hash<T2>{}(p.second)); + return seed; + } +}; + /// FindOrInsert(): if the key is present, return the value; if the key is not present, /// create a new entry (key, default_val) and return default_val. /// TODO: replace with single template which takes a template param http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/tests/metadata/test_partition_metadata.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py index 0758fde..a6f635a 100644 --- a/tests/metadata/test_partition_metadata.py +++ b/tests/metadata/test_partition_metadata.py @@ -20,6 +20,11 @@ from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_single_exec_option_dimension from tests.util.filesystem_utils import WAREHOUSE +# Map from the test dimension file_format string to the SQL "STORED AS" +# argument. +STORED_AS_ARGS = { 'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro', + 'seq': 'sequencefile' } + # Tests specific to partition metadata. # TODO: Split up the DDL tests and move some of the partition-specific tests # here. @@ -33,10 +38,15 @@ class TestPartitionMetadata(ImpalaTestSuite): super(TestPartitionMetadata, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) - # There is no reason to run these tests using all dimensions. - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').file_format == 'text' and\ - v.get_value('table_format').compression_codec == 'none') + # Run one variation of the test with each file formats that we support writing. + # The compression shouldn't affect the partition handling so restrict to the core + # compression codecs. + cls.ImpalaTestMatrix.add_constraint(lambda v: + (v.get_value('table_format').file_format in ('text', 'parquet') and + v.get_value('table_format').compression_codec == 'none') or + (v.get_value('table_format').file_format in ('seq', 'avro') and + v.get_value('table_format').compression_codec == 'snap' and + v.get_value('table_format').compression_type == 'block')) @SkipIfLocal.hdfs_client def test_multiple_partitions_same_location(self, vector, unique_database): @@ -46,11 +56,13 @@ class TestPartitionMetadata(ImpalaTestSuite): TBL_NAME = "same_loc_test" FQ_TBL_NAME = unique_database + "." + TBL_NAME TBL_LOCATION = '%s/%s.db/%s' % (WAREHOUSE, unique_database, TBL_NAME) + file_format = vector.get_value('table_format').file_format # Cleanup any existing data in the table directory. self.filesystem_client.delete_file_dir(TBL_NAME, recursive=True) # Create the table - self.client.execute("create table %s (i int) partitioned by(j int) location '%s'" - % (FQ_TBL_NAME, TBL_LOCATION)) + self.client.execute( + "create table %s (i int) partitioned by(j int) stored as %s location '%s'" + % (FQ_TBL_NAME, STORED_AS_ARGS[file_format], TBL_LOCATION)) # Point multiple partitions to the same location and use partition locations that # do not contain a key=value path. @@ -62,20 +74,41 @@ class TestPartitionMetadata(ImpalaTestSuite): self.client.execute("alter table %s add partition (j=2) location '%s/p'" % (FQ_TBL_NAME, TBL_LOCATION)) + # Allow unsupported avro and sequence file writer. + self.client.execute("set allow_unsupported_formats=true") + # Insert some data. This will only update partition j=1 (IMPALA-1480). self.client.execute("insert into table %s partition(j=1) select 1" % FQ_TBL_NAME) - # Refresh to update file metadata of both partitions. + # Refresh to update file metadata of both partitions self.client.execute("refresh %s" % FQ_TBL_NAME) # The data will be read twice because each partition points to the same location. data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) - assert data.split('\t') == ['2', '3'] + if file_format == 'avro': + # Avro writer is broken and produces nulls. Only check partition column. + assert data.split('\t')[1] == '3' + else: + assert data.split('\t') == ['2', '3'] self.client.execute("insert into %s partition(j) select 1, 1" % FQ_TBL_NAME) self.client.execute("insert into %s partition(j) select 1, 2" % FQ_TBL_NAME) self.client.execute("refresh %s" % FQ_TBL_NAME) data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) - assert data.split('\t') == ['6', '9'] + if file_format == 'avro': + # Avro writer is broken and produces nulls. Only check partition column. + assert data.split('\t')[1] == '9' + else: + assert data.split('\t') == ['6', '9'] + + # Force all scan ranges to be on the same node. It should produce the same + # result as above. See IMPALA-5412. + self.client.execute("set num_nodes=1") + data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) + if file_format == 'avro': + # Avro writer is broken and produces nulls. Only check partition column. + assert data.split('\t')[1] == '9' + else: + assert data.split('\t') == ['6', '9'] @SkipIfS3.hive @SkipIfADLS.hive
