http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 0ff5ce7..b19f23e 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -320,42 +320,29 @@ class BaseScalarColumnReader : public ParquetColumnReader { BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) : ParquetColumnReader(parent, node, slot_desc), - data_(NULL), - data_end_(NULL), - def_levels_(true), - rep_levels_(false), - page_encoding_(parquet::Encoding::PLAIN_DICTIONARY), - num_buffered_values_(0), - num_values_read_(0), - metadata_(NULL), - stream_(NULL), data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) { DCHECK_GE(node_.col_idx, 0) << node_.DebugString(); } virtual ~BaseScalarColumnReader() { } - /// This is called once for each row group in the file. - Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) { - DCHECK(stream != NULL); - DCHECK(metadata != NULL); - - num_buffered_values_ = 0; - data_ = NULL; - data_end_ = NULL; - stream_ = stream; - metadata_ = metadata; - num_values_read_ = 0; - def_level_ = HdfsParquetScanner::INVALID_LEVEL; - // See ColumnReader constructor. - rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL; - pos_current_value_ = HdfsParquetScanner::INVALID_POS; - - if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { - RETURN_IF_ERROR(Codec::CreateDecompressor( - NULL, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_)); - } - ClearDictionaryDecoder(); + /// Resets the reader for each row group in the file and creates the scan + /// range for the column, but does not start it. To start scanning, + /// set_io_reservation() must be called to assign reservation to this + /// column, followed by StartScan(). + Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk, + int row_group_idx); + + /// Starts the column scan range. The reader must be Reset() and have a + /// reservation assigned via set_io_reservation(). This must be called + /// before any of the column data can be read (including dictionary and + /// data pages). Returns an error status if there was an error starting the + /// scan or allocating buffers for it. + Status StartScan(); + + /// Helper to start scans for multiple columns at once. + static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) { + for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan()); return Status::OK(); } @@ -370,22 +357,27 @@ class BaseScalarColumnReader : public ParquetColumnReader { if (dict_decoder != nullptr) dict_decoder->Close(); } + io::ScanRange* scan_range() const { return scan_range_; } int64_t total_len() const { return metadata_->total_compressed_size; } int col_idx() const { return node_.col_idx; } THdfsCompression::type codec() const { if (metadata_ == NULL) return THdfsCompression::NONE; return ConvertParquetToImpalaCodec(metadata_->codec); } + void set_io_reservation(int bytes) { io_reservation_ = bytes; } /// Reads the next definition and repetition levels for this column. Initializes the /// next data page if necessary. virtual bool NextLevels() { return NextLevels<true>(); } - // Check the data stream to see if there is a dictionary page. If there is, - // use that page to initialize dict_decoder_ and advance the data stream - // past the dictionary page. + /// Check the data stream to see if there is a dictionary page. If there is, + /// use that page to initialize dict_decoder_ and advance the data stream + /// past the dictionary page. Status InitDictionary(); + /// Convenience function to initialize multiple dictionaries. + static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers); + // Returns the dictionary or NULL if the dictionary doesn't exist virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; } @@ -411,33 +403,45 @@ class BaseScalarColumnReader : public ParquetColumnReader { // fit in as few cache lines as possible. /// Pointer to start of next value in data page - uint8_t* data_; + uint8_t* data_ = nullptr; /// End of the data page. - const uint8_t* data_end_; + const uint8_t* data_end_ = nullptr; /// Decoder for definition levels. - ParquetLevelDecoder def_levels_; + ParquetLevelDecoder def_levels_{true}; /// Decoder for repetition levels. - ParquetLevelDecoder rep_levels_; + ParquetLevelDecoder rep_levels_{false}; /// Page encoding for values of the current data page. Cached here for perf. Set in /// InitDataPage(). - parquet::Encoding::type page_encoding_; + parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; /// Num values remaining in the current data page - int num_buffered_values_; + int num_buffered_values_ = 0; // Less frequently used members that are not accessed in inner loop should go below // here so they do not occupy precious cache line space. /// The number of values seen so far. Updated per data page. - int64_t num_values_read_; + int64_t num_values_read_ = 0; + + /// Metadata for the column for the current row group. + const parquet::ColumnMetaData* metadata_ = nullptr; - const parquet::ColumnMetaData* metadata_; boost::scoped_ptr<Codec> decompressor_; - ScannerContext::Stream* stream_; + + /// The scan range for the column's data. Initialized for each row group by Reset(). + io::ScanRange* scan_range_ = nullptr; + + // Stream used to read data from 'scan_range_'. Initialized by StartScan(). + ScannerContext::Stream* stream_ = nullptr; + + /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set + /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group + /// by Reset(). + int64_t io_reservation_ = 0; /// Pool to allocate storage for data pages from - either decompression buffers for /// compressed data pages or copies of the data page with var-len data to attach to
http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index abdde07..c669e65 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -41,14 +41,15 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024; const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT; ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, - HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range, - const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool) + BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc, + const vector<FilterContext>& filter_ctxs, + MemPool* expr_results_pool) : state_(state), scan_node_(scan_node), + bp_client_(bp_client), partition_desc_(partition_desc), filter_ctxs_(filter_ctxs), expr_results_pool_(expr_results_pool) { - AddStream(scan_range); } ScannerContext::~ScannerContext() { @@ -66,19 +67,20 @@ void ScannerContext::ClearStreams() { } ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range, - const HdfsFileDesc* file_desc) + int64_t reservation, const HdfsFileDesc* file_desc) : parent_(parent), scan_range_(scan_range), file_desc_(file_desc), + reservation_(reservation), file_len_(file_desc->file_length), next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES), boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())), boundary_buffer_(new StringBuffer(boundary_pool_.get())) { } -ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) { - streams_.emplace_back(new Stream( - this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file()))); +ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) { + streams_.emplace_back(new Stream(this, range, reservation, + scan_node_->GetFileDesc(partition_desc_->id(), range->file()))); return streams_.back().get(); } @@ -101,6 +103,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) { Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { DCHECK_EQ(0, io_buffer_bytes_left_); + DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED; if (io_buffer_ != nullptr) ReturnIoBuffer(); @@ -121,7 +124,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { SCOPED_TIMER(parent_->state_->total_storage_wait_timer()); int64_t read_past_buffer_size = 0; - int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size(); + int64_t max_buffer_size = io_mgr->max_buffer_size(); if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset); if (read_past_buffer_size <= 0) { // Either no callback was set or the callback did not return an estimate. Use @@ -133,6 +136,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { read_past_buffer_size = ::max(read_past_buffer_size, read_past_size); read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining); read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size); + read_past_buffer_size = ::min(read_past_buffer_size, reservation_); // We're reading past the scan range. Be careful not to read past the end of file. DCHECK_GE(read_past_buffer_size, 0); if (read_past_buffer_size == 0) { @@ -143,8 +147,23 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { ScanRange* range = parent_->scan_node_->AllocateScanRange( scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id, scan_range_->disk_id(), false, BufferOpts::Uncached()); - RETURN_IF_ERROR(parent_->state_->io_mgr()->Read( - parent_->scan_node_->reader_context(), range, &io_buffer_)); + bool needs_buffers; + RETURN_IF_ERROR(io_mgr->StartScanRange( + parent_->scan_node_->reader_context(), range, &needs_buffers)); + if (needs_buffers) { + // Allocate fresh buffers. The buffers for 'scan_range_' should be released now + // since we hit EOS. + if (reservation_ < io_mgr->min_buffer_size()) { + return Status(Substitute("Could not read past end of scan range in file '$0'. " + "Reservation provided $1 was < the minimum I/O buffer size", + reservation_, io_mgr->min_buffer_size())); + } + RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange( + parent_->scan_node_->reader_context(), parent_->bp_client_, range, + reservation_)); + } + RETURN_IF_ERROR(range->GetNext(&io_buffer_)); + DCHECK(io_buffer_->eosr()); } DCHECK(io_buffer_ != nullptr); @@ -324,7 +343,8 @@ Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) { void ScannerContext::Stream::ReturnIoBuffer() { DCHECK(io_buffer_ != nullptr); - ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_)); + ScanRange* range = io_buffer_->scan_range(); + range->ReturnBuffer(move(io_buffer_)); io_buffer_pos_ = nullptr; io_buffer_bytes_left_ = 0; } http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index a131d3f..6292486 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -27,6 +27,7 @@ #include "common/compiler-util.h" #include "common/status.h" #include "exec/filter-context.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/io/request-ranges.h" namespace impala { @@ -84,10 +85,12 @@ class TupleRow; class ScannerContext { public: /// Create a scanner context with the parent scan_node (where materialized row batches - /// get pushed to) and the scan range to process. - /// This context starts with 1 stream. - ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*, - io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs, + /// get pushed to) and the scan range to process. Buffers are allocated using + /// 'bp_client'. + ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, + BufferPool::ClientHandle* bp_client, + HdfsPartitionDescriptor* partition_desc, + const std::vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool); /// Destructor verifies that all stream objects have been released. ~ScannerContext(); @@ -150,6 +153,7 @@ class ScannerContext { const char* filename() { return scan_range_->file(); } const io::ScanRange* scan_range() { return scan_range_; } const HdfsFileDesc* file_desc() { return file_desc_; } + int64_t reservation() const { return reservation_; } /// Returns the buffer's current offset in the file. int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; } @@ -211,9 +215,15 @@ class ScannerContext { private: friend class ScannerContext; - ScannerContext* parent_; - io::ScanRange* scan_range_; - const HdfsFileDesc* file_desc_; + ScannerContext* const parent_; + io::ScanRange* const scan_range_; + const HdfsFileDesc* const file_desc_; + + /// Reservation given to this stream for allocating I/O buffers. The reservation is + /// shared with 'scan_range_', so the context must be careful not to use this until + /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr + /// buffer size to allow reading past the end of 'scan_range_'. + const int64_t reservation_; /// Total number of bytes returned from GetBytes() int64_t total_bytes_returned_ = 0; @@ -272,7 +282,8 @@ class ScannerContext { /// output_buffer_bytes_left_ will be set to something else. static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0; - Stream(ScannerContext* parent, io::ScanRange* scan_range, + /// Private constructor. See AddStream() for public API. + Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation, const HdfsFileDesc* file_desc); /// GetBytes helper to handle the slow path. @@ -355,24 +366,37 @@ class ScannerContext { /// size to 0. void ClearStreams(); - /// Add a stream to this ScannerContext for 'range'. The stream is owned by this - /// context. - Stream* AddStream(io::ScanRange* range); + /// Add a stream to this ScannerContext for 'range'. 'range' must already have any + /// buffers that it needs allocated. 'reservation' is the amount of reservation that + /// is given to this stream for allocating I/O buffers. The reservation is shared with + /// 'range', so the context must be careful not to use this until all of 'range's + /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading + /// past the end of 'range'. + /// + /// Returns the added stream. The returned stream is owned by this context. + Stream* AddStream(io::ScanRange* range, int64_t reservation); /// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not /// multi-threaded and is done (finished, cancelled or reached it's limit). /// In all other cases returns false. bool cancelled() const; - HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; } + BufferPool::ClientHandle* bp_client() const { return bp_client_; } + HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; } const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; } MemPool* expr_results_pool() const { return expr_results_pool_; } private: friend class Stream; - RuntimeState* state_; - HdfsScanNodeBase* scan_node_; - HdfsPartitionDescriptor* partition_desc_; + RuntimeState* const state_; + HdfsScanNodeBase* const scan_node_; + + /// Buffer pool client used to allocate I/O buffers. This is accessed by multiple + /// threads in the multi-threaded scan node, so those threads must take care to only + /// call thread-safe BufferPool methods with this client. + BufferPool::ClientHandle* const bp_client_; + + HdfsPartitionDescriptor* const partition_desc_; /// Vector of streams. Non-columnar formats will always have one stream per context. std::vector<std::unique_ptr<Stream>> streams_; http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index 285aacb..d14da63 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -37,6 +37,7 @@ namespace impala { +class MemTracker; class ReservationTracker; class RuntimeProfile; class SystemAllocator; http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc index c46c5ea..e441402 100644 --- a/be/src/runtime/bufferpool/reservation-tracker-test.cc +++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc @@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) { TEST_F(ReservationTrackerTest, ReservationUtil) { const int64_t MEG = 1024 * 1024; const int64_t GIG = 1024 * 1024 * 1024; - EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING); + EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING); EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0)); EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1)); - EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG)); + EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG)); EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG)); - EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0)); - EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1)); + EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0)); + EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1)); EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG)); EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG)); http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/bufferpool/reservation-util.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc index 85718ab..a27ab9d 100644 --- a/be/src/runtime/bufferpool/reservation-util.cc +++ b/be/src/runtime/bufferpool/reservation-util.cc @@ -24,7 +24,7 @@ namespace impala { // Most operators that accumulate memory use reservations, so the majority of memory // should be allocated to buffer reservations, as a heuristic. const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8; -const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024; +const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 1024; int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) { int64_t max_reservation = std::min<int64_t>( http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 072baf3..d4fed5d 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -36,9 +36,9 @@ #include "runtime/client-cache.h" #include "runtime/coordinator.h" #include "runtime/data-stream-mgr.h" -#include "runtime/io/disk-io-mgr.h" #include "runtime/hbase-table-factory.h" #include "runtime/hdfs-fs-cache.h" +#include "runtime/io/disk-io-mgr.h" #include "runtime/krpc-data-stream-mgr.h" #include "runtime/lib-cache.h" #include "runtime/mem-tracker.h" @@ -344,10 +344,7 @@ Status ExecEnv::Init() { LOG(INFO) << "Buffer pool limit: " << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES); - RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get())); - - mem_tracker_->AddGcFunction( - [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); }); + RETURN_IF_ERROR(disk_io_mgr_->Init()); // Start services in order to ensure that dependencies between them are met if (enable_webserver_) { http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h index e6962ea..292530f 100644 --- a/be/src/runtime/io/disk-io-mgr-internal.h +++ b/be/src/runtime/io/disk-io-mgr-internal.h @@ -34,9 +34,25 @@ #include "util/filesystem-util.h" #include "util/hdfs-util.h" #include "util/impalad-metrics.h" +#include "util/runtime-profile-counters.h" /// This file contains internal structures shared between submodules of the IoMgr. Users /// of the IoMgr do not need to include this file. + +// Macros to work around counters sometimes not being provided. +// TODO: fix things so that counters are always non-NULL. +#define COUNTER_ADD_IF_NOT_NULL(c, v) \ + do { \ + ::impala::RuntimeProfile::Counter* __ctr__ = (c); \ + if (__ctr__ != nullptr) __ctr__->Add(v); \ + } while (false); + +#define COUNTER_BITOR_IF_NOT_NULL(c, v) \ + do { \ + ::impala::RuntimeProfile::Counter* __ctr__ = (c); \ + if (__ctr__ != nullptr) __ctr__->BitOr(v); \ + } while (false); + namespace impala { namespace io { http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr-stress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc index 45b36ed..2ec1d09 100644 --- a/be/src/runtime/io/disk-io-mgr-stress-test.cc +++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc @@ -15,8 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include <gflags/gflags.h> + +#include "common/init.h" #include "runtime/io/disk-io-mgr-stress.h" -#include "util/cpu-info.h" +#include "common/init.h" +#include "runtime/test-env.h" +#include "service/fe-support.h" #include "util/string-parser.h" #include "common/names.h" @@ -28,34 +33,32 @@ using namespace impala::io; // can be passed to control how long to run this test (0 for forever). // TODO: make these configurable once we decide how to run BE tests with args -const int DEFAULT_DURATION_SEC = 1; +constexpr int DEFAULT_DURATION_SEC = 1; const int NUM_DISKS = 5; const int NUM_THREADS_PER_DISK = 5; const int NUM_CLIENTS = 10; const bool TEST_CANCELLATION = true; +const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L; + +DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC, + "Disk I/O Manager stress test duration in seconds. 0 means run indefinitely."); int main(int argc, char** argv) { - google::InitGoogleLogging(argv[0]); - CpuInfo::Init(); - OsInfo::Init(); - impala::InitThreading(); - int duration_sec = DEFAULT_DURATION_SEC; - - if (argc == 2) { - StringParser::ParseResult status; - duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status); - if (status != StringParser::PARSE_SUCCESS) { - printf("Invalid arg: %s\n", argv[1]); - return 1; - } - } - if (duration_sec != 0) { - printf("Running stress test for %d seconds.\n", duration_sec); + impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); + impala::InitFeSupport(); + + if (FLAGS_duration_sec != 0) { + printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec); } else { printf("Running stress test indefinitely.\n"); } - DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION); - test.Run(duration_sec); + TestEnv test_env; + // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it. + test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, BUFFER_POOL_CAPACITY); + Status status = test_env.Init(); + CHECK(status.ok()) << status.GetDetail(); + DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION); + test.Run(FLAGS_duration_sec); return 0; } http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc index 8815357..3fd33de 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.cc +++ b/be/src/runtime/io/disk-io-mgr-stress.cc @@ -19,6 +19,8 @@ #include "runtime/io/disk-io-mgr-stress.h" +#include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/exec-env.h" #include "runtime/io/request-context.h" #include "util/time.h" @@ -27,18 +29,20 @@ using namespace impala; using namespace impala::io; -static const float ABORT_CHANCE = .10f; -static const int MIN_READ_LEN = 1; -static const int MAX_READ_LEN = 20; +constexpr float DiskIoMgrStress::ABORT_CHANCE; +const int DiskIoMgrStress::MIN_READ_LEN; +const int DiskIoMgrStress::MAX_READ_LEN; -static const int MIN_FILE_LEN = 10; -static const int MAX_FILE_LEN = 1024; +const int DiskIoMgrStress::MIN_FILE_LEN; +const int DiskIoMgrStress::MAX_FILE_LEN; // Make sure this is between MIN/MAX FILE_LEN to test more cases -static const int MIN_READ_BUFFER_SIZE = 64; -static const int MAX_READ_BUFFER_SIZE = 128; +const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE; +const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE; -static const int CANCEL_READER_PERIOD_MS = 20; // in ms +const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE; + +const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS; static void CreateTempFile(const char* filename, const char* data) { FILE* file = fopen(filename, "w"); @@ -47,7 +51,7 @@ static void CreateTempFile(const char* filename, const char* data) { fclose(file); } -string GenerateRandomData() { +string DiskIoMgrStress::GenerateRandomData() { int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN; stringstream ss; for (int i = 0; i < rand_len; ++i) { @@ -59,6 +63,8 @@ string GenerateRandomData() { struct DiskIoMgrStress::Client { boost::mutex lock; + /// Pool for objects that is cleared when the client is (re-)initialized in NewClient(). + ObjectPool obj_pool; unique_ptr<RequestContext> reader; int file_idx; vector<ScanRange*> scan_ranges; @@ -77,7 +83,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk, io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk, MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE)); - Status status = io_mgr_->Init(&mem_tracker_); + Status status = io_mgr_->Init(); CHECK(status.ok()); // Initialize some data files. It doesn't really matter how many there are. @@ -92,6 +98,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk, clients_ = new Client[num_clients_]; client_mem_trackers_.resize(num_clients_); + buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]); for (int i = 0; i < num_clients_; ++i) { NewClient(i); } @@ -110,9 +117,16 @@ void DiskIoMgrStress::ClientThread(int client_id) { while (!eos) { ScanRange* range; - Status status = io_mgr_->GetNextRange(client->reader.get(), &range); + bool needs_buffers; + Status status = + io_mgr_->GetNextUnstartedRange(client->reader.get(), &range, &needs_buffers); CHECK(status.ok() || status.IsCancelled()); if (range == NULL) break; + if (needs_buffers) { + status = io_mgr_->AllocateBuffersForRange(client->reader.get(), + &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE); + CHECK(status.ok()) << status.GetDetail(); + } while (true) { unique_ptr<BufferDescriptor> buffer; @@ -137,7 +151,7 @@ void DiskIoMgrStress::ClientThread(int client_id) { // Copy the bytes from this read into the result buffer. memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len()); - io_mgr_->ReturnBuffer(move(buffer)); + range->ReturnBuffer(move(buffer)); bytes_read += len; CHECK_GE(bytes_read, 0); @@ -159,6 +173,7 @@ void DiskIoMgrStress::ClientThread(int client_id) { // Unregister the old client and get a new one unique_lock<mutex> lock(client->lock); io_mgr_->UnregisterContext(client->reader.get()); + client->reader.reset(); NewClient(client_id); } @@ -170,11 +185,9 @@ void DiskIoMgrStress::ClientThread(int client_id) { // Cancel a random reader void DiskIoMgrStress::CancelRandomReader() { if (!includes_cancellation_) return; - - int rand_client = rand() % num_clients_; - - unique_lock<mutex> lock(clients_[rand_client].lock); - io_mgr_->CancelContext(clients_[rand_client].reader.get()); + Client* rand_client = &clients_[rand() % num_clients_]; + unique_lock<mutex> lock(rand_client->lock); + rand_client->reader->Cancel(); } void DiskIoMgrStress::Run(int sec) { @@ -199,10 +212,18 @@ void DiskIoMgrStress::Run(int sec) { for (int i = 0; i < num_clients_; ++i) { unique_lock<mutex> lock(clients_[i].lock); - if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get()); + if (clients_[i].reader != NULL) clients_[i].reader->Cancel(); } - readers_.join_all(); + + for (int i = 0; i < num_clients_; ++i) { + if (clients_[i].reader != nullptr) { + io_mgr_->UnregisterContext(clients_[i].reader.get()); + } + ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]); + client_mem_trackers_[i]->Close(); + } + mem_tracker_.Close(); } // Initialize a client to read one of the files at random. The scan ranges are @@ -223,25 +244,41 @@ void DiskIoMgrStress::NewClient(int i) { } } - for (int i = 0; i < client.scan_ranges.size(); ++i) { - delete client.scan_ranges[i]; - } + // Clean up leftover state from the previous client (if any). client.scan_ranges.clear(); + ExecEnv* exec_env = ExecEnv::GetInstance(); + exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]); + if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close(); + client.obj_pool.Clear(); int assigned_len = 0; while (assigned_len < file_len) { int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN; range_len = min(range_len, file_len - assigned_len); - ScanRange* range = new ScanRange(); + ScanRange* range = client.obj_pool.Add(new ScanRange); range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len, 0, false, BufferOpts::Uncached()); client.scan_ranges.push_back(range); assigned_len += range_len; } - client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_)); - client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get()); - Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges); + string client_name = Substitute("Client $0", i); + client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_)); + Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr, + exec_env->buffer_reservation(), client_mem_trackers_[i].get(), + numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name), + &buffer_pool_clients_[i]); + CHECK(status.ok()); + // Reserve enough memory for 3 buffers per range, which should be enough to guarantee + // progress. + CHECK(buffer_pool_clients_[i].IncreaseReservationToFit( + MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size())) + << buffer_pool_clients_[i].DebugString() << "\n" + << exec_env->buffer_pool()->DebugString() << "\n" + << exec_env->buffer_reservation()->DebugString(); + + client.reader = io_mgr_->RegisterContext(); + status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges); CHECK(status.ok()); } http://git-wip-us.apache.org/repos/asf/impala/blob/fb5dc9eb/be/src/runtime/io/disk-io-mgr-stress.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h index b872694..574b58c 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.h +++ b/be/src/runtime/io/disk-io-mgr-stress.h @@ -22,8 +22,11 @@ #include <memory> #include <vector> #include <boost/scoped_ptr.hpp> +#include <boost/thread/condition_variable.hpp> #include <boost/thread/thread.hpp> +#include "common/object-pool.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" @@ -43,15 +46,29 @@ class DiskIoMgrStress { /// Run the test for 'sec'. If 0, run forever void Run(int sec); + static constexpr float ABORT_CHANCE = .10f; + static const int MIN_READ_LEN = 1; + static const int MAX_READ_LEN = 20; + + static const int MIN_FILE_LEN = 10; + static const int MAX_FILE_LEN = 1024; + + // Make sure this is between MIN/MAX FILE_LEN to test more cases + static const int MIN_READ_BUFFER_SIZE = 64; + static const int MAX_READ_BUFFER_SIZE = 128; + + // Maximum bytes to allocate per scan range. + static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3; + + static const int CANCEL_READER_PERIOD_MS = 20; private: struct Client; struct File { std::string filename; - std::string data; // the data in the file, used to validate + std::string data; // the data in the file, used to validate }; - /// Files used for testing. These are created at startup and recycled /// during the test std::vector<File> files_; @@ -72,6 +89,9 @@ class DiskIoMgrStress { /// Client MemTrackers, one per client. std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_; + /// Buffer pool clients, one per client. + std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_; + /// If true, tests cancelling readers bool includes_cancellation_; @@ -88,6 +108,8 @@ class DiskIoMgrStress { /// Possibly cancels a random reader. void CancelRandomReader(); + + static std::string GenerateRandomData(); }; } }
