http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index dc35600..bf2b7ec 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -15,16 +15,19 @@ // specific language governing permissions and limitations // under the License. +#include "runtime/tmp-file-mgr.h" + #include <boost/algorithm/string.hpp> +#include <boost/filesystem.hpp> #include <boost/lexical_cast.hpp> #include <boost/thread/locks.hpp> -#include <boost/uuid/uuid_io.hpp> #include <boost/uuid/random_generator.hpp> -#include <boost/filesystem.hpp> -#include <gutil/strings/substitute.h> +#include <boost/uuid/uuid_io.hpp> #include <gutil/strings/join.h> +#include <gutil/strings/substitute.h> -#include "runtime/tmp-file-mgr.h" +#include "runtime/runtime-state.h" +#include "runtime/tmp-file-mgr-internal.h" #include "util/debug-util.h" #include "util/disk-info.h" #include "util/filesystem-util.h" @@ -32,9 +35,13 @@ #include "common/names.h" +DEFINE_bool(disk_spill_encryption, false, + "Set this to encrypt and perform an integrity " + "check on all data spilled to disk during a query"); DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories"); - -#include "common/names.h" +DEFINE_bool(allow_multiple_scratch_dirs_per_device, false, + "If false and --scratch_dirs contains multiple directories on the same device, " + "then only the first writable directory is used"); using boost::algorithm::is_any_of; using boost::algorithm::join; @@ -55,8 +62,10 @@ const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dir const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST = "tmp-file-mgr.active-scratch-dirs.list"; -TmpFileMgr::TmpFileMgr() : initialized_(false), dir_status_lock_(), tmp_dirs_(), - num_active_scratch_dirs_metric_(NULL), active_scratch_dirs_metric_(NULL) {} +TmpFileMgr::TmpFileMgr() + : initialized_(false), + num_active_scratch_dirs_metric_(nullptr), + active_scratch_dirs_metric_(nullptr) {} Status TmpFileMgr::Init(MetricGroup* metrics) { string tmp_dirs_spec = FLAGS_scratch_dirs; @@ -65,7 +74,7 @@ Status TmpFileMgr::Init(MetricGroup* metrics) { if (!tmp_dirs_spec.empty()) { split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on); } - return InitCustom(all_tmp_dirs, true, metrics); + return InitCustom(all_tmp_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics); } Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_device, @@ -108,7 +117,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true; LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on " << "disk " << disk_id; - tmp_dirs_.push_back(Dir(scratch_subdir_path.string(), false)); + tmp_dirs_.push_back(scratch_subdir_path.string()); } else { LOG(WARNING) << "Could not remove and recreate directory " << scratch_subdir_path.string() << ": cannot use it for scratch. " @@ -117,14 +126,14 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d } } - DCHECK(metrics != NULL); + DCHECK(metrics != nullptr); num_active_scratch_dirs_metric_ = metrics->AddGauge<int64_t>(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0); - active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(metrics, - TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>()); + active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister( + metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>()); num_active_scratch_dirs_metric_->set_value(tmp_dirs_.size()); for (int i = 0; i < tmp_dirs_.size(); ++i) { - active_scratch_dirs_metric_->Add(tmp_dirs_[i].path()); + active_scratch_dirs_metric_->Add(tmp_dirs_[i]); } initialized_ = true; @@ -137,24 +146,20 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d return Status::OK(); } -Status TmpFileMgr::NewFile(FileGroup* file_group, const DeviceId& device_id, - const TUniqueId& query_id, unique_ptr<File>* new_file) { +Status TmpFileMgr::NewFile( + FileGroup* file_group, DeviceId device_id, unique_ptr<File>* new_file) { DCHECK(initialized_); DCHECK_GE(device_id, 0); DCHECK_LT(device_id, tmp_dirs_.size()); - DCHECK(file_group != NULL); - if (IsBlacklisted(device_id)) { - return Status(TErrorCode::TMP_DEVICE_BLACKLISTED, tmp_dirs_[device_id].path()); - } - + DCHECK(file_group != nullptr); // Generate the full file path. string unique_name = lexical_cast<string>(random_generator()()); stringstream file_name; - file_name << PrintId(query_id) << "_" << unique_name; - path new_file_path(tmp_dirs_[device_id].path()); + file_name << PrintId(file_group->unique_id()) << "_" << unique_name; + path new_file_path(tmp_dirs_[device_id]); new_file_path /= file_name.str(); - new_file->reset(new File(this, file_group, device_id, new_file_path.string())); + new_file->reset(new File(file_group, device_id, new_file_path.string())); return Status::OK(); } @@ -162,162 +167,133 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const { DCHECK(initialized_); DCHECK_GE(device_id, 0); DCHECK_LT(device_id, tmp_dirs_.size()); - return tmp_dirs_[device_id].path(); + return tmp_dirs_[device_id]; } -void TmpFileMgr::BlacklistDevice(DeviceId device_id) { +int TmpFileMgr::NumActiveTmpDevices() { DCHECK(initialized_); - DCHECK(device_id >= 0 && device_id < tmp_dirs_.size()); - bool added; - { - lock_guard<SpinLock> l(dir_status_lock_); - added = tmp_dirs_[device_id].blacklist(); - } - if (added) { - num_active_scratch_dirs_metric_->Increment(-1); - active_scratch_dirs_metric_->Remove(tmp_dirs_[device_id].path()); - } + return tmp_dirs_.size(); } -bool TmpFileMgr::IsBlacklisted(DeviceId device_id) { - DCHECK(initialized_); - DCHECK(device_id >= 0 && device_id < tmp_dirs_.size()); - lock_guard<SpinLock> l(dir_status_lock_); - return tmp_dirs_[device_id].is_blacklisted(); -} - -int TmpFileMgr::num_active_tmp_devices() { - DCHECK(initialized_); - lock_guard<SpinLock> l(dir_status_lock_); - int num_active = 0; - for (int device_id = 0; device_id < tmp_dirs_.size(); ++device_id) { - if (!tmp_dirs_[device_id].is_blacklisted()) ++num_active; - } - return num_active; -} - -vector<TmpFileMgr::DeviceId> TmpFileMgr::active_tmp_devices() { +vector<TmpFileMgr::DeviceId> TmpFileMgr::ActiveTmpDevices() { vector<TmpFileMgr::DeviceId> devices; - // Allocate vector before we grab lock - devices.reserve(tmp_dirs_.size()); - { - lock_guard<SpinLock> l(dir_status_lock_); - for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) { - if (!tmp_dirs_[device_id].is_blacklisted()) { - devices.push_back(device_id); - } - } + for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) { + devices.push_back(device_id); } return devices; } -TmpFileMgr::File::File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id, - const string& path) - : mgr_(mgr), - file_group_(file_group), +TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string& path) + : file_group_(file_group), path_(path), device_id_(device_id), - current_size_(0), + disk_id_(DiskInfo::disk_id(path.c_str())), + bytes_allocated_(0), blacklisted_(false) { - DCHECK(file_group != NULL); + DCHECK(file_group != nullptr); } Status TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) { DCHECK_GT(num_bytes, 0); - Status status; - if (mgr_->IsBlacklisted(device_id_)) { - blacklisted_ = true; - return Status(TErrorCode::TMP_FILE_BLACKLISTED, path_); - } - if (current_size_ == 0) { - // First call to AllocateSpace. Create the file. - status = FileSystemUtil::CreateFile(path_); - if (!status.ok()) { - ReportIOError(status.msg()); - return status; - } - disk_id_ = DiskInfo::disk_id(path_.c_str()); - } - int64_t new_size = current_size_ + num_bytes; - status = FileSystemUtil::ResizeFile(path_, new_size); - if (!status.ok()) { - ReportIOError(status.msg()); - return status; - } - *offset = current_size_; - current_size_ = new_size; + *offset = bytes_allocated_; + bytes_allocated_ += num_bytes; return Status::OK(); } -void TmpFileMgr::File::ReportIOError(const ErrorMsg& msg) { +int TmpFileMgr::File::AssignDiskQueue() const { + return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false); +} + +void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) { LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg(); - // IMPALA-2305: avoid blacklisting to prevent test failures. - // blacklisted_ = true; - // mgr_->BlacklistDevice(device_id_); + blacklisted_ = true; } Status TmpFileMgr::File::Remove() { - if (current_size_ > 0) FileSystemUtil::RemovePaths(vector<string>(1, path_)); + // Remove the file if present (it may not be present if no writes completed). + FileSystemUtil::RemovePaths({path_}); return Status::OK(); } -TmpFileMgr::FileGroup::FileGroup( - TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit) +string TmpFileMgr::File::DebugString() { + return Substitute("File $0 path '$1' device id $2 disk id $3 bytes allocated $4 " + "blacklisted $5", this, path_, device_id_, disk_id_, bytes_allocated_, + blacklisted_); +} + +TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, + RuntimeProfile* profile, const TUniqueId& unique_id, int64_t block_size, + int64_t bytes_limit) : tmp_file_mgr_(tmp_file_mgr), - current_bytes_allocated_(0), + io_mgr_(io_mgr), + io_ctx_(nullptr), + unique_id_(unique_id), + block_size_(block_size), bytes_limit_(bytes_limit), + write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)), + bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)), + read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)), + bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)), + scratch_space_bytes_used_counter_( + ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)), + disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")), + encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")), + current_bytes_allocated_(0), next_allocation_index_(0) { - DCHECK(tmp_file_mgr != NULL); - scratch_space_bytes_used_counter_ = - ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES); + DCHECK_GT(block_size_, 0); + DCHECK(tmp_file_mgr != nullptr); + io_mgr_->RegisterContext(&io_ctx_, nullptr); } -Status TmpFileMgr::FileGroup::CreateFiles(const TUniqueId& query_id) { +TmpFileMgr::FileGroup::~FileGroup() { + DCHECK_EQ(tmp_files_.size(), 0); +} + +Status TmpFileMgr::FileGroup::CreateFiles() { + lock_.DCheckLocked(); DCHECK(tmp_files_.empty()); - vector<Status> errs; - vector<DeviceId> tmp_devices = tmp_file_mgr_->active_tmp_devices(); + vector<DeviceId> tmp_devices = tmp_file_mgr_->ActiveTmpDevices(); int files_allocated = 0; // Initialize the tmp files and the initial file to use. for (int i = 0; i < tmp_devices.size(); ++i) { - TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i]; + TmpFileMgr::DeviceId device_id = tmp_devices[i]; // It is possible for a device to be blacklisted after it was returned by - // active_tmp_devices(), handle this gracefully by skipping devices if NewFile() + // ActiveTmpDevices(), handle this gracefully by skipping devices if NewFile() // fails. - Status status = NewFile(tmp_device_id, query_id); + unique_ptr<TmpFileMgr::File> tmp_file; + Status status = tmp_file_mgr_->NewFile(this, device_id, &tmp_file); if (status.ok()) { + tmp_files_.emplace_back(std::move(tmp_file)); ++files_allocated; } else { - errs.push_back(std::move(status)); + scratch_errors_.push_back(std::move(status)); } } DCHECK_EQ(tmp_files_.size(), files_allocated); if (tmp_files_.size() == 0) { - Status err_status("Could not create files in any configured scratch directories " - "(--scratch_dirs)."); - for (Status& err : errs) err_status.MergeStatus(err); + // TODO: IMPALA-4697: the merged errors do not show up in the query error log, + // so we must point users to the impalad error log. + Status err_status( + "Could not create files in any configured scratch directories (--scratch_dirs). " + "See logs for previous errors that may have caused this."); + for (Status& err : scratch_errors_) err_status.MergeStatus(err); return err_status; } - // Start allocating on a random device to avoid overloading the first device. next_allocation_index_ = rand() % tmp_files_.size(); return Status::OK(); } -Status TmpFileMgr::FileGroup::NewFile(const DeviceId& device_id, - const TUniqueId& query_id, File** new_file) { - unique_ptr<TmpFileMgr::File> tmp_file; - RETURN_IF_ERROR(tmp_file_mgr_->NewFile(this, device_id, query_id, &tmp_file)); - if (new_file != NULL) *new_file = tmp_file.get(); - tmp_files_.emplace_back(std::move(tmp_file)); - return Status::OK(); -} - void TmpFileMgr::FileGroup::Close() { - for (std::unique_ptr<TmpFileMgr::File>& file: tmp_files_) { + // Cancel writes before deleting the files, since in-flight writes could re-create + // deleted files. + if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_); + io_ctx_ = nullptr; + for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) { Status status = file->Remove(); if (!status.ok()) { - LOG(WARNING) << "Error removing scratch file '" << file->path() << "': " - << status.msg().msg(); + LOG(WARNING) << "Error removing scratch file '" << file->path() + << "': " << status.msg().msg(); } } tmp_files_.clear(); @@ -325,18 +301,31 @@ void TmpFileMgr::FileGroup::Close() { Status TmpFileMgr::FileGroup::AllocateSpace( int64_t num_bytes, File** tmp_file, int64_t* file_offset) { - if (bytes_limit_ != -1 && current_bytes_allocated_ + num_bytes > bytes_limit_) { + DCHECK_LE(num_bytes, block_size_); + lock_guard<SpinLock> lock(lock_); + + if (!free_ranges_.empty()) { + *tmp_file = free_ranges_.back().first; + *file_offset = free_ranges_.back().second; + free_ranges_.pop_back(); + return Status::OK(); + } + + if (bytes_limit_ != -1 && current_bytes_allocated_ + block_size_ > bytes_limit_) { return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_); } - vector<Status> errs; + + // Lazily create the files on the first write. + if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles()); + // Find the next physical file in round-robin order and allocate a range from it. for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) { *tmp_file = tmp_files_[next_allocation_index_].get(); next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size(); if ((*tmp_file)->is_blacklisted()) continue; - Status status = (*tmp_file)->AllocateSpace(num_bytes, file_offset); + Status status = (*tmp_file)->AllocateSpace(block_size_, file_offset); if (status.ok()) { - scratch_space_bytes_used_counter_->Add(num_bytes); + scratch_space_bytes_used_counter_->Add(block_size_); current_bytes_allocated_ += num_bytes; return Status::OK(); } @@ -345,12 +334,261 @@ Status TmpFileMgr::FileGroup::AllocateSpace( LOG(WARNING) << "Error while allocating range in scratch file '" << (*tmp_file)->path() << "': " << status.msg().msg() << ". Will try another scratch file."; - errs.push_back(status); + scratch_errors_.push_back(status); } - Status err_status("No usable scratch files: space could not be allocated in any " - "of the configured scratch directories (--scratch_dirs)."); - for (Status& err : errs) err_status.MergeStatus(err); + // TODO: IMPALA-4697: the merged errors do not show up in the query error log, + // so we must point users to the impalad error log. + Status err_status( + "No usable scratch files: space could not be allocated in any of " + "the configured scratch directories (--scratch_dirs). See logs for previous " + "errors that may have caused this."); + // Include all previous errors that may have caused the failure. + for (Status& err : scratch_errors_) err_status.MergeStatus(err); return err_status; } +void TmpFileMgr::FileGroup::AddFreeRange(File* file, int64_t offset) { + lock_guard<SpinLock> lock(lock_); + free_ranges_.emplace_back(file, offset); +} + +Status TmpFileMgr::FileGroup::Write( + MemRange buffer, WriteDoneCallback cb, unique_ptr<TmpFileMgr::WriteHandle>* handle) { + DCHECK_GE(buffer.len(), 0); + + File* tmp_file; + int64_t file_offset; + RETURN_IF_ERROR(AllocateSpace(buffer.len(), &tmp_file, &file_offset)); + + unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb)); + WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda. + DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr]( + const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); }; + RETURN_IF_ERROR( + tmp_handle->Write(io_mgr_, io_ctx_, tmp_file, file_offset, buffer, callback)); + write_counter_->Add(1); + bytes_written_counter_->Add(buffer.len()); + *handle = move(tmp_handle); + return Status::OK(); +} + +Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) { + DCHECK(handle->write_range_ != nullptr); + DCHECK(!handle->is_cancelled_); + DCHECK_EQ(buffer.len(), handle->len()); + + // Don't grab 'lock_' in this method - it is not necessary because we don't touch + // any members that it protects and could block other threads for the duration of + // the synchronous read. + DCHECK(!handle->write_in_flight_); + DCHECK(handle->write_range_ != nullptr); + // Don't grab handle->lock_, it is safe to touch all of handle's state since the + // write is not in flight. + DiskIoMgr::ScanRange* scan_range = scan_range_pool_.Add(new DiskIoMgr::ScanRange); + scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(), + handle->write_range_->offset(), handle->write_range_->disk_id(), false, + DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len())); + DiskIoMgr::BufferDescriptor* io_mgr_buffer; + { + SCOPED_TIMER(disk_read_timer_); + read_counter_->Add(1); + bytes_read_counter_->Add(buffer.len()); + RETURN_IF_ERROR(io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer)); + } + + if (FLAGS_disk_spill_encryption) { + RETURN_IF_ERROR(handle->CheckHashAndDecrypt(buffer)); + } + + DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data()); + DCHECK_EQ(io_mgr_buffer->len(), buffer.len()); + DCHECK(io_mgr_buffer->eosr()); + io_mgr_buffer->Return(); + return Status::OK(); +} + +Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData( + unique_ptr<WriteHandle> handle, MemRange buffer) { + DCHECK_EQ(handle->write_range_->data(), buffer.data()); + DCHECK_EQ(handle->len(), buffer.len()); + handle->Cancel(); + + // Decrypt regardless of whether the write is still in flight or not. An in-flight + // write may write bogus data to disk but this lets us get some work done while the + // write is being cancelled. + Status status; + if (FLAGS_disk_spill_encryption) { + status = handle->CheckHashAndDecrypt(buffer); + } + handle->WaitForWrite(); + AddFreeRange(handle->file_, handle->write_range_->offset()); + handle.reset(); + return status; +} + +void TmpFileMgr::FileGroup::DestroyWriteHandle(unique_ptr<WriteHandle> handle) { + handle->Cancel(); + handle->WaitForWrite(); + AddFreeRange(handle->file_, handle->write_range_->offset()); + handle.reset(); +} + +void TmpFileMgr::FileGroup::WriteComplete( + WriteHandle* handle, const Status& write_status) { + Status status; + if (!write_status.ok()) { + status = RecoverWriteError(handle, write_status); + if (status.ok()) return; + } else { + status = write_status; + } + handle->WriteComplete(status); +} + +Status TmpFileMgr::FileGroup::RecoverWriteError( + WriteHandle* handle, const Status& write_status) { + DCHECK(!write_status.ok()); + DCHECK(handle->file_ != nullptr); + + // We can't recover from cancellation or memory limit exceeded. + if (write_status.IsCancelled() || write_status.IsMemLimitExceeded()) { + return write_status; + } + + // Save and report the error before retrying so that the failure isn't silent. + { + lock_guard<SpinLock> lock(lock_); + scratch_errors_.push_back(write_status); + } + handle->file_->Blacklist(write_status.msg()); + + // Do not retry cancelled writes or propagate the error, simply return CANCELLED. + if (handle->is_cancelled_) return Status::CANCELLED; + + TmpFileMgr::File* tmp_file; + int64_t file_offset; + // Discard the scratch file range - we will not reuse ranges from a bad file. + // Choose another file to try. Blacklisting ensures we don't retry the same file. + // If this fails, the status will include all the errors in 'scratch_errors_'. + RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset)); + return handle->RetryWrite(io_mgr_, io_ctx_, tmp_file, file_offset); +} + +string TmpFileMgr::FileGroup::DebugString() { + lock_guard<SpinLock> lock(lock_); + stringstream ss; + ss << "FileGroup " << this << " block size " << block_size_ + << " bytes limit " << bytes_limit_ + << " current bytes allocated " << current_bytes_allocated_ + << " next allocation index " << next_allocation_index_ + << " writes " << write_counter_->value() + << " bytes written " << bytes_written_counter_->value() + << " reads " << read_counter_->value() + << " bytes read " << bytes_read_counter_->value() + << " scratch bytes used " << scratch_space_bytes_used_counter_ + << " dist read timer " << disk_read_timer_->value() + << " encryption timer " << encryption_timer_->value() << endl + << " " << tmp_files_.size() << " files:" << endl; + for (unique_ptr<File>& file : tmp_files_) { + ss << " " << file->DebugString() << endl; + } + return ss.str(); +} + +TmpFileMgr::WriteHandle::WriteHandle( + RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb) + : cb_(cb), + encryption_timer_(encryption_timer), + file_(nullptr), + is_cancelled_(false), + write_in_flight_(false) {} + +string TmpFileMgr::WriteHandle::TmpFilePath() const { + if (file_ == nullptr) return ""; + return file_->path(); +} + +Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, + File* file, int64_t offset, MemRange buffer, + DiskIoMgr::WriteRange::WriteDoneCallback callback) { + DCHECK(!write_in_flight_); + + if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer)); + + file_ = file; + write_in_flight_ = true; + write_range_.reset( + new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), callback)); + write_range_->SetData(buffer.data(), buffer.len()); + return io_mgr->AddWriteRange(io_ctx, write_range_.get()); +} + +Status TmpFileMgr::WriteHandle::RetryWrite( + DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) { + DCHECK(write_in_flight_); + file_ = file; + write_in_flight_ = true; + write_range_->SetRange(file->path(), offset, file->AssignDiskQueue()); + return io_mgr->AddWriteRange(io_ctx, write_range_.get()); +} + +void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) { + WriteDoneCallback cb; + { + lock_guard<mutex> lock(write_state_lock_); + DCHECK(write_in_flight_); + write_in_flight_ = false; + // Need to extract 'cb_' because once 'write_in_flight_' is false, the WriteHandle + // may be destroyed. + cb = move(cb_); + } + write_complete_cv_.NotifyAll(); + // Call 'cb' once we've updated the state. We must do this last because once 'cb' is + // called, it is valid to call Read() on the handle. + cb(write_status); +} + +void TmpFileMgr::WriteHandle::Cancel() { + unique_lock<mutex> lock(write_state_lock_); + is_cancelled_ = true; + // TODO: in future, if DiskIoMgr supported cancellation, we could cancel it here. +} + +void TmpFileMgr::WriteHandle::WaitForWrite() { + unique_lock<mutex> lock(write_state_lock_); + while (write_in_flight_) write_complete_cv_.Wait(lock); +} + +Status TmpFileMgr::WriteHandle::EncryptAndHash(MemRange buffer) { + DCHECK(FLAGS_disk_spill_encryption); + SCOPED_TIMER(encryption_timer_); + // Since we're using AES-CFB mode, we must take care not to reuse a key/IV pair. + // Regenerate a new key and IV for every data buffer we write. + key_.InitializeRandom(); + RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data())); + hash_.Compute(buffer.data(), buffer.len()); + return Status::OK(); +} + +Status TmpFileMgr::WriteHandle::CheckHashAndDecrypt(MemRange buffer) { + DCHECK(FLAGS_disk_spill_encryption); + SCOPED_TIMER(encryption_timer_); + if (!hash_.Verify(buffer.data(), buffer.len())) { + return Status("Block verification failure"); + } + return key_.Decrypt(buffer.data(), buffer.len(), buffer.data()); +} + +string TmpFileMgr::WriteHandle::DebugString() { + unique_lock<mutex> lock(write_state_lock_); + stringstream ss; + ss << "Write handle " << this << " file '" << file_->path() << "'" + << " is cancelled " << is_cancelled_ << " write in flight " << write_in_flight_; + if (write_range_ != NULL) { + ss << " data " << write_range_->data() << " len " << write_range_->len() + << " file offset " << write_range_->offset() + << " disk id " << write_range_->disk_id(); + } + return ss.str(); +} } // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 3c489b2..0c3e974 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -18,146 +18,211 @@ #ifndef IMPALA_RUNTIME_TMP_FILE_MGR_H #define IMPALA_RUNTIME_TMP_FILE_MGR_H +#include <functional> +#include <utility> + +#include <boost/scoped_ptr.hpp> +#include <boost/thread/mutex.hpp> + +#include "common/object-pool.h" #include "common/status.h" #include "gen-cpp/Types_types.h" // for TUniqueId +#include "runtime/disk-io-mgr.h" +#include "util/mem-range.h" #include "util/collection-metrics.h" +#include "util/condition-variable.h" +#include "util/openssl-util.h" #include "util/runtime-profile.h" #include "util/spinlock.h" namespace impala { -/// TmpFileMgr creates and manages temporary files and directories on the local -/// filesystem. It can manage multiple temporary directories across multiple devices. -/// TmpFileMgr ensures that at most one directory per device is used unless overridden -/// for testing. +/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files +/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch +/// directories across multiple devices, configured via the --scratch_dirs option. +/// TmpFileMgr manages I/O to scratch files in order to abstract away details of which +/// files are allocated and recovery from certain I/O errors. I/O is done via DiskIoMgr. +/// TmpFileMgr encrypts data written to disk if enabled by the --disk_spill_encryption +/// command-line flag. +/// +/// FileGroups manage scratch space across multiple devices. To write to scratch space, +/// first a FileGroup is created, then FileGroup::Write() is called to asynchronously +/// write a memory buffer to one of the scratch files. FileGroup::Write() returns a +/// WriteHandle, which is used by the caller to identify that write operation. The +/// caller is notified when the asynchronous write completes via a callback, after which +/// the caller can use the WriteHandle to read back the data. /// -/// Every temporary File belongs to a FileGroup: to allocate temporary files, first a -/// FileGroup is created, then FileGroup::NewFile() is called to create a new File with -/// a unique filename on the specified temporary device. The client can use the File -/// handle to allocate space in the file. FileGroups can be created with a limit on -/// the total number of bytes allocated across all files in the group. +/// Each WriteHandle is backed by a range of data in a scratch file. The first call to +/// Write() will create files for the FileGroup with unique filenames on the configured +/// temporary devices. At most one directory per device is used (unless overridden for +/// testing). Free space is managed within a FileGroup: once a WriteHandle is destroyed, +/// the file range backing it can be recycled for a different WriteHandle. The file range +/// of a WriteHandle can be replaced with a different one if a write error is encountered +/// and the data instead needs to be written to a different disk. /// -/// TODO: we could notify block managers about the failure so they can more take -/// proactive action to avoid using the device. +/// Resource Management: +/// TmpFileMgr provides some basic support for managing local disk space consumption. +/// A FileGroup can be created with a limit on the total number of bytes allocated across +/// all files. Writes that would exceed the limit fail with an error status. +/// +/// TODO: each FileGroup can manage only fixed length scratch file ranges of 'block_size', +/// to simplify the recycling logic. BufferPool will require variable length ranges. +/// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to +/// temporarily blacklist devices that show I/O errors. class TmpFileMgr { public: - class FileGroup; + class File; // Needs to be public for TmpFileMgrTest. + class WriteHandle; - /// DeviceId is a unique identifier for a temporary device managed by TmpFileMgr. - /// It is used as a handle for external classes to identify devices. + /// DeviceId is an internal unique identifier for a temporary device managed by + /// TmpFileMgr. DeviceIds in the range [0, num tmp devices) are allocated arbitrarily. + /// Needs to be public for TmpFileMgrTest. typedef int DeviceId; - /// File is a handle to a physical file in a temporary directory. Clients - /// can allocate file space and remove files using AllocateSpace() and Remove(). - /// Creation of the file is deferred until the first call to AllocateSpace(). - class File { + typedef std::function<void(const Status&)> WriteDoneCallback; + + /// Represents a group of temporary files - one per disk with a scratch directory. The + /// total allocated bytes of the group can be bound by setting the space allocation + /// limit. The owner of the FileGroup object is responsible for calling the Close() + /// method to delete all the files in the group. + /// + /// Public methods of FileGroup and WriteHandle are safe to call concurrently from + /// multiple threads as long as different WriteHandle arguments are provided. + class FileGroup { public: - /// Called to notify TmpFileMgr that an IO error was encountered for this file - void ReportIOError(const ErrorMsg& msg); + /// Initialize a new file group, which will create files using 'tmp_file_mgr' + /// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track scratch + /// space used. 'unique_id' is a unique ID that is used to prefix any scratch file + /// names. It is an error to create multiple FileGroups with the same 'unique_id'. + /// 'block_size' is the size of blocks in bytes that space will be allocated in. + /// 'bytes_limit' is the limit on the total file space to allocate. + FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile, + const TUniqueId& unique_id, int64_t block_size, int64_t bytes_limit = -1); + + ~FileGroup(); + + /// Asynchronously writes 'buffer' to a temporary file of this file group. If there + /// are multiple scratch files, this can write to any of them, and will attempt to + /// recover from I/O errors on one file by writing to a different file. The memory + /// referenced by 'buffer' must remain valid until the write completes. The callee + /// may rewrite the data in 'buffer' in-place (e.g. to do in-place encryption or + /// compression). The caller should not modify the data in 'buffer' until the write + /// completes or is cancelled, otherwise invalid data may be written to disk. + /// + /// TODO: buffer->len must be <= 'block_size' until FileGroup supports allocating + /// variable-length scratch files ranges. + /// + /// Returns an error if the scratch space cannot be allocated or the write cannot + /// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from + /// a different thread when the write completes successfully or unsuccessfully or is + /// cancelled. + /// + /// 'handle' must be destroyed by passing the DestroyWriteHandle() or + /// CancelWriteAndRestoreData(). + Status Write( + MemRange buffer, WriteDoneCallback cb, std::unique_ptr<WriteHandle>* handle); + + /// Synchronously read the data referenced by 'handle' from the temporary file into + /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called + /// after a write successfully completes. + Status Read(WriteHandle* handle, MemRange buffer); + + /// Cancels the write referenced by 'handle' and destroy associate resources. Also + /// restore the original data in the 'buffer' passed to Write(), decrypting or + /// decompressing as necessary. The cancellation always succeeds, but an error + /// is returned if restoring the data fails. + Status CancelWriteAndRestoreData( + std::unique_ptr<WriteHandle> handle, MemRange buffer); + + /// Wait for the in-flight I/Os to complete and destroy resources associated with + /// 'handle'. + void DestroyWriteHandle(std::unique_ptr<WriteHandle> handle); + + /// Calls Remove() on all the files in the group and deletes them. + void Close(); + + std::string DebugString(); - const std::string& path() const { return path_; } - int disk_id() const { return disk_id_; } - bool is_blacklisted() const { return blacklisted_; } + const TUniqueId& unique_id() const { return unique_id_; } private: - friend class FileGroup; - friend class TmpFileMgr; + friend class File; friend class TmpFileMgrTest; - /// Allocates 'num_bytes' bytes in this file for a new block of data. - /// The file size is increased by a call to truncate() if necessary. - /// The physical file is created on the first call to AllocateSpace(). - /// Returns Status::OK() and sets offset on success. - /// Returns an error status if an unexpected error occurs, e.g. the file could not - /// be created. - Status AllocateSpace(int64_t num_bytes, int64_t* offset); - - /// Delete the physical file on disk, if one was created. - /// It is not valid to read or write to a file after calling Remove(). - Status Remove(); + /// Initializes the file group with one temporary file per disk with a scratch + /// directory. Returns OK if at least one temporary file could be created. + /// Returns an error if no temporary files were successfully created. Must only be + /// called once. Must be called with 'lock_' held. + Status CreateFiles(); - /// The name of the sub-directory that Impala created within each configured scratch - /// directory. - const static std::string TMP_SUB_DIR_NAME; + /// Allocate 'num_bytes' bytes in a temporary file. Try multiple disks if error + /// occurs. Returns an error only if no temporary files are usable or the scratch + /// limit is exceeded. Must be called without 'lock_' held. + Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset); - /// Space (in MB) that must ideally be available for writing on a scratch - /// directory. A warning is issued if available space is less than this threshold. - const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB; + /// Add a free scratch range to 'free_ranges_'. Must be called without 'lock_' held. + void AddFreeRange(File* file, int64_t offset); - File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id, - const std::string& path); + /// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt + /// to retry the write. On success or if the write can't be retried, calls + /// handle->WriteComplete(). + void WriteComplete(WriteHandle* handle, const Status& write_status); - /// TmpFileMgr this belongs to. - TmpFileMgr* mgr_; + /// Handles a write error. Logs the write error and blacklists the device for this + /// file group if the cause was an I/O error. Blacklisting limits the number of times + /// a write is retried because each device will only be tried once. Returns OK if it + /// successfully reissued the write. Returns an error status if the original error + /// was unrecoverable or an unrecoverable error is encountered when reissuing the + /// write. The error status will include all previous I/O errors in its details. + Status RecoverWriteError(WriteHandle* handle, const Status& write_status); - /// The FileGroup this belongs to. Cannot be null. - FileGroup* file_group_; + /// The TmpFileMgr it is associated with. + TmpFileMgr* const tmp_file_mgr_; - /// Path of the physical file in the filesystem. - std::string path_; + /// DiskIoMgr used for all I/O to temporary files. + DiskIoMgr* const io_mgr_; - /// The temporary device this file is stored on. - DeviceId device_id_; + /// I/O context used for all reads and writes. Registered in constructor. + DiskIoRequestContext* io_ctx_; - /// The id of the disk on which the physical file lies. - int disk_id_; + /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be + /// touched by DiskIoMgr even after the scan is finished. + /// TODO: IMPALA-4249: remove once lifetime of ScanRange objects is better defined. + ObjectPool scan_range_pool_; - /// Current file size. Modified by AllocateSpace(). Size is 0 before file creation. - int64_t current_size_; + /// Unique across all FileGroups. Used to prefix file names. + const TUniqueId unique_id_; - /// Set to true to indicate that file can't be expanded. This is useful to keep here - /// even though it is redundant with the global per-device blacklisting in TmpFileMgr - /// because it can be checked without acquiring a global lock. If a file is - /// blacklisted, the corresponding device will always be blacklisted. - bool blacklisted_; - }; + /// Size of the blocks in bytes that scratch space is managed in. + /// TODO: support variable-length scratch file ranges. + const int64_t block_size_; - /// Represents a group of temporary files - one per disk with a scratch directory. The - /// total allocated bytes of the group can be bound by setting the space allocation - /// limit. The owner of the FileGroup object is responsible for calling the Close() - /// method to delete all the files in the group. - class FileGroup { - public: - /// Initialize a new file group, which will create files using 'tmp_file_mgr'. - /// Adds counters to 'profile' to track scratch space used. 'bytes_limit' is - /// the limit on the total file space to allocate. - FileGroup( - TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit = -1); + /// Max write space allowed (-1 means no limit). + const int64_t bytes_limit_; - ~FileGroup() { DCHECK_EQ(NumFiles(), 0); } + /// Number of write operations (includes writes started but not yet complete). + RuntimeProfile::Counter* const write_counter_; - /// Initializes the file group with one temporary file per disk with a scratch - /// directory. 'unique_id' is a unique ID that should be used to prefix any - /// scratch file names. It is an error to create multiple FileGroups with the - /// same 'unique_id'. Returns OK if at least one temporary file could be created. - /// Returns an error if no temporary files were successfully created. Must only be - /// called once. - Status CreateFiles(const TUniqueId& unique_id); + /// Number of bytes written to disk (includes writes started but not yet complete). + RuntimeProfile::Counter* const bytes_written_counter_; - /// Allocate num_bytes bytes in a temporary file. Try multiple disks if error occurs. - /// Returns an error only if no temporary files are usable or the scratch limit is - /// exceeded. - Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset); + /// Number of read operations (includes reads started but not yet complete). + RuntimeProfile::Counter* const read_counter_; - /// Calls Remove() on all the files in the group and deletes them. - void Close(); + /// Number of bytes read from disk (includes reads started but not yet complete). + RuntimeProfile::Counter* const bytes_read_counter_; - /// Returns the number of files that are a part of the group. - int NumFiles() { return tmp_files_.size(); } + /// Amount of scratch space allocated in bytes. + RuntimeProfile::Counter* const scratch_space_bytes_used_counter_; - private: - friend class TmpFileMgrTest; + /// Time taken for disk reads. + RuntimeProfile::Counter* const disk_read_timer_; - /// Creates a new File with a unique path for a query instance, adds it to the - /// group and returns a handle for that file. The file path is within the (single) - /// tmp directory on the specified device id. - /// If an error is encountered, e.g. the device is blacklisted, the file is not - /// added to this group and a non-ok status is returned. - Status NewFile( - const DeviceId& device_id, const TUniqueId& unique_id, File** new_file = NULL); + /// Time spent in disk spill encryption, decryption, and integrity checking. + RuntimeProfile::Counter* encryption_timer_; - /// The TmpFileMgr it is associated with. - TmpFileMgr* tmp_file_mgr_; + /// Protects below members. + SpinLock lock_; /// List of files representing the FileGroup. std::vector<std::unique_ptr<File>> tmp_files_; @@ -165,16 +230,117 @@ class TmpFileMgr { /// Total space allocated in this group's files. int64_t current_bytes_allocated_; - /// Max write space allowed (-1 means no limit). - const int64_t bytes_limit_; - /// Index into 'tmp_files' denoting the file to which the next temporary file range /// should be allocated from. Used to implement round-robin allocation from temporary /// files. int next_allocation_index_; - /// Amount of scratch space allocated in bytes. - RuntimeProfile::Counter* scratch_space_bytes_used_counter_; + /// List of File/offset pairs for free scratch ranges of size 'block_size_' bytes. + std::vector<std::pair<File*, int64_t>> free_ranges_; + + /// Errors encountered when creating/writing scratch files. We store the history so + /// that we can report the original cause of the scratch errors if we run out of + /// devices to write to. + std::vector<Status> scratch_errors_; + }; + + /// A handle to a write operation, backed by a range of a temporary file. The operation + /// is either in-flight or has completed. If it completed with no error and wasn't + /// cancelled then the data is in the file and can be read back. + /// + /// WriteHandle is returned from FileGroup::Write(). After the write completes, the + /// handle can be passed to FileGroup::Read() to read back the data zero or more times. + /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and + /// allow reuse of the scratch file range written to. Alternatively, + /// FileGroup::CancelWriteAndRestoreData() can be called to reverse the effects of + /// FileGroup::Write() by destroying the handle and restoring the original data to the + /// buffer, so long as the data in the buffer was not modified by the caller. + /// + /// Public methods of WriteHandle are safe to call concurrently from multiple threads. + class WriteHandle { + public: + // The write must be destroyed by FileGroup::DestroyWriteHandle(). + ~WriteHandle() { + DCHECK(!write_in_flight_); + DCHECK(is_cancelled_); + } + + /// Path of temporary file backing the block. Intended for use in testing. + /// Returns empty string if no backing file allocated. + std::string TmpFilePath() const; + + /// The length of the write range in bytes. + int64_t len() const { return write_range_->len(); } + + std::string DebugString(); + + private: + friend class FileGroup; + + WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb); + + /// Starts a write of 'buffer' to 'offset' of 'file'. + Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, + int64_t offset, MemRange buffer, + DiskIoMgr::WriteRange::WriteDoneCallback callback); + + /// Retry the write after the initial write failed with an error, instead writing to + /// 'offset' of 'file'. + Status RetryWrite( + DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset); + + /// Cancels the write asynchronously. After Cancel() is called, writes are not + /// retried. + void Cancel(); + + /// Blocks until the write completes either successfully or unsuccessfully. + void WaitForWrite(); + + /// Called when the write has completed successfully or not. Sets 'write_in_flight_' + /// then calls 'cb_'. + void WriteComplete(const Status& write_status); + + /// Encrypts the data in 'buffer' in-place and computes 'hash_'. + Status EncryptAndHash(MemRange buffer); + + /// Verifies the integrity hash and decrypts the contents of 'buffer' in place. + Status CheckHashAndDecrypt(MemRange buffer); + + /// Callback to be called when the write completes. + WriteDoneCallback cb_; + + /// Reference to the FileGroup's 'encryption_timer_'. + RuntimeProfile::Counter* encryption_timer_; + + /// The DiskIoMgr write range for this write. + boost::scoped_ptr<DiskIoMgr::WriteRange> write_range_; + + /// The temporary file being written to. + File* file_; + + /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector. + /// Regenerated for each write. + EncryptionKey key_; + + /// If --disk_spill_encryption is on, our hash of the data being written. Filled in + /// on writes; verified on reads. This is calculated _after_ encryption. + IntegrityHash hash_; + + /// Protects all fields below while 'write_in_flight_' is true. At other times, it is + /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads, + /// so no locking is required. This is a terminal lock and should not be held while + /// acquiring other locks or invoking 'cb_'. + boost::mutex write_state_lock_; + + // True if the the write has been cancelled (but is not necessarily complete). + bool is_cancelled_; + + // True if a write is in flight. + bool write_in_flight_; + + /// Signalled when the write completes and 'write_in_flight_' becomes false, before + /// 'cb_' is invoked. + ConditionVariable write_complete_cv_; }; TmpFileMgr(); @@ -194,59 +360,27 @@ class TmpFileMgr { /// Total number of devices with tmp directories that are active. There is one tmp /// directory per device. - int num_active_tmp_devices(); + int NumActiveTmpDevices(); /// Return vector with device ids of all tmp devices being actively used. /// I.e. those that haven't been blacklisted. - std::vector<DeviceId> active_tmp_devices(); + std::vector<DeviceId> ActiveTmpDevices(); private: - /// Return a new File handle with a unique path for a query instance. The file is - /// associated with the file_group and the file path is within the (single) tmp + friend class TmpFileMgrTest; + + /// Return a new File handle with a path based on file_group->unique_id. The file is + /// associated with the 'file_group' and the file path is within the (single) scratch /// directory on the specified device id. The caller owns the returned handle and is /// responsible for deleting it. The file is not created - creation is deferred until - /// the first call to File::AllocateSpace(). - Status NewFile(FileGroup* file_group, const DeviceId& device_id, - const TUniqueId& unique_id, std::unique_ptr<File>* new_file); - - /// Dir stores information about a temporary directory. - class Dir { - public: - const std::string& path() const { return path_; } - - // Return true if it was newly added to blacklist. - bool blacklist() { - bool was_blacklisted = blacklisted_; - blacklisted_ = true; - return !was_blacklisted; - } - bool is_blacklisted() const { return blacklisted_; } - - private: - friend class TmpFileMgr; - - /// path should be a absolute path to a writable scratch directory. - Dir(const std::string& path, bool blacklisted) - : path_(path), blacklisted_(blacklisted) {} - - std::string path_; - - bool blacklisted_; - }; - - /// Remove a device from the rotation. Subsequent attempts to allocate a file on that - /// device will fail and the device will not be included in active tmp devices. - void BlacklistDevice(DeviceId device_id); - - bool IsBlacklisted(DeviceId device_id); + /// the file is written. + Status NewFile( + FileGroup* file_group, DeviceId device_id, std::unique_ptr<File>* new_file); bool initialized_; - /// Protects the status of tmp dirs (i.e. whether they're blacklisted). - SpinLock dir_status_lock_; - - /// The created tmp directories. - std::vector<Dir> tmp_dirs_; + /// The paths of the created tmp directories. + std::vector<std::string> tmp_dirs_; /// Metrics to track active scratch directories. IntGauge* num_active_scratch_dirs_metric_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.cc ---------------------------------------------------------------------- diff --git a/be/src/util/disk-info.cc b/be/src/util/disk-info.cc index d3eeb56..eba4f26 100644 --- a/be/src/util/disk-info.cc +++ b/be/src/util/disk-info.cc @@ -48,7 +48,6 @@ bool DiskInfo::initialized_; vector<DiskInfo::Disk> DiskInfo::disks_; map<dev_t, int> DiskInfo::device_id_to_disk_id_; map<string, int> DiskInfo::disk_name_to_disk_id_; -int DiskInfo::num_datanode_dirs_; // Parses /proc/partitions to get the number of disks. A bit of looking around // seems to indicate this as the best way to do this. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.h ---------------------------------------------------------------------- diff --git a/be/src/util/disk-info.h b/be/src/util/disk-info.h index 4853511..323a265 100644 --- a/be/src/util/disk-info.h +++ b/be/src/util/disk-info.h @@ -43,24 +43,8 @@ class DiskInfo { return disks_.size(); } -#if 0 - /// Returns the number of (logical) disks the data node is using. - /// It is possible for this to be more than num_disks since the datanode - /// can be configured to have multiple data directories on the same physical - /// disk. - static int num_datanode_dirs() { - DCHECK(initialized_); - return num_datanode_dirs_; - } - - /// Returns a 0-based disk index for the data node dirs index. - static int disk_id(int datanode_dir_idx) { - return 0; - } -#endif - /// Returns the 0-based disk index for 'path' (path must be a FS path, not - /// hdfs path). + /// hdfs path). Returns -1 if the disk index is unknown. static int disk_id(const char* path); /// Returns the device name (e.g. sda) for disk_id @@ -100,15 +84,11 @@ class DiskInfo { /// mapping of dev_ts to disk ids static std::map<dev_t, int> device_id_to_disk_id_; - + /// mapping of devices names to disk ids static std::map<std::string, int> disk_name_to_disk_id_; - static int num_datanode_dirs_; - static void GetDeviceNames(); }; - - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc index 01cd927..a0cacdf 100644 --- a/be/src/util/filesystem-util.cc +++ b/be/src/util/filesystem-util.cc @@ -114,17 +114,6 @@ Status FileSystemUtil::CreateFile(const string& file_path) { return Status::OK(); } -Status FileSystemUtil::ResizeFile(const string& file_path, int64_t trunc_len) { - int success = truncate(file_path.c_str(), trunc_len); - if (success != 0) { - return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute( - "Truncate file $0 to length $1 failed with errno $2 ($3)", - file_path, trunc_len, errno, GetStrErrMsg()))); - } - - return Status::OK(); -} - Status FileSystemUtil::VerifyIsDirectory(const string& directory_path) { error_code errcode; bool exists = filesystem::exists(directory_path, errcode); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h index 887dc4b..3e824b8 100644 --- a/be/src/util/filesystem-util.h +++ b/be/src/util/filesystem-util.h @@ -36,9 +36,6 @@ class FileSystemUtil { /// Create a file at the specified path. static Status CreateFile(const std::string& file_path); - /// Resize a file to a specified length - uses unistd truncate(). - static Status ResizeFile(const std::string& file_path, int64_t trunc_len); - /// Remove the specified paths and their enclosing files/directories. static Status RemovePaths(const std::vector<std::string>& directories); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/mem-range.h ---------------------------------------------------------------------- diff --git a/be/src/util/mem-range.h b/be/src/util/mem-range.h new file mode 100644 index 0000000..c55caaf --- /dev/null +++ b/be/src/util/mem-range.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_UTIL_MEM_RANGE_H +#define IMPALA_UTIL_MEM_RANGE_H + +#include <cstdint> + +#include "common/logging.h" + +namespace impala { + +/// Represents a range of memory. This is a convenient alternative to passing around +/// a separate pointer and length. +class MemRange { + public: + MemRange(uint8_t* data, int64_t len) : data_(data), len_(len) { + DCHECK_GE(len, 0); + DCHECK(len == 0 || data != nullptr); + } + + uint8_t* data() const { return data_; } + int64_t len() const { return len_; } + + static MemRange null() { return MemRange(nullptr, 0); } + + private: + uint8_t* data_; + int64_t len_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index bb4251b..5088a1b 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -64,7 +64,7 @@ enum TParquetFallbackSchemaResolution { // metadata which overrides everything else. struct TQueryOptions { 1: optional bool abort_on_error = 0 - 2: optional i32 max_errors = 0 + 2: optional i32 max_errors = 100 3: optional bool disable_codegen = 0 4: optional i32 batch_size = 0 5: optional i32 num_nodes = NUM_NODES_ALL http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/testdata/workloads/functional-query/queries/QueryTest/set.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test index e405caf..1dd1396 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/set.test +++ b/testdata/workloads/functional-query/queries/QueryTest/set.test @@ -14,7 +14,7 @@ set 'EXPLAIN_LEVEL','1' 'HBASE_CACHE_BLOCKS','0' 'HBASE_CACHING','0' -'MAX_ERRORS','0' +'MAX_ERRORS','100' 'MAX_IO_BUFFERS','0' 'MAX_SCAN_RANGE_LENGTH','0' 'MEM_LIMIT','0' @@ -46,7 +46,7 @@ set; 'EXPLAIN_LEVEL','3' 'HBASE_CACHE_BLOCKS','0' 'HBASE_CACHING','0' -'MAX_ERRORS','0' +'MAX_ERRORS','100' 'MAX_IO_BUFFERS','0' 'MAX_SCAN_RANGE_LENGTH','0' 'MEM_LIMIT','0' @@ -78,7 +78,7 @@ set; 'EXPLAIN_LEVEL','0' 'HBASE_CACHE_BLOCKS','0' 'HBASE_CACHING','0' -'MAX_ERRORS','0' +'MAX_ERRORS','100' 'MAX_IO_BUFFERS','0' 'MAX_SCAN_RANGE_LENGTH','0' 'MEM_LIMIT','0' @@ -111,7 +111,7 @@ set; 'EXPLAIN_LEVEL','1' 'HBASE_CACHE_BLOCKS','0' 'HBASE_CACHING','0' -'MAX_ERRORS','0' +'MAX_ERRORS','100' 'MAX_IO_BUFFERS','0' 'MAX_SCAN_RANGE_LENGTH','0' 'MEM_LIMIT','0' http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/tests/custom_cluster/test_scratch_disk.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py index 1c02b56..f523dbe 100644 --- a/tests/custom_cluster/test_scratch_disk.py +++ b/tests/custom_cluster/test_scratch_disk.py @@ -52,22 +52,7 @@ class TestScratchDir(CustomClusterTestSuite): def get_dirs(dirs): return ','.join(dirs) - @classmethod - def setup_class(cls): - super(TestScratchDir, cls).setup_class() - cls.normal_dirs = cls.generate_dirs(5) - cls.non_writable_dirs = cls.generate_dirs(5, writable=False) - cls.non_existing_dirs = cls.generate_dirs(5, non_existing=True) - - @classmethod - def teardown_class(cls): - for dir_path in cls.normal_dirs: - shutil.rmtree(dir_path) - for dir_path in cls.non_writable_dirs: - shutil.rmtree(dir_path) - - @classmethod - def generate_dirs(cls, num, writable=True, non_existing=False): + def generate_dirs(self, num, writable=True, non_existing=False): result = [] for i in xrange(num): dir_path = tempfile.mkdtemp() @@ -75,27 +60,30 @@ class TestScratchDir(CustomClusterTestSuite): shutil.rmtree(dir_path) elif not writable: os.chmod(dir_path, stat.S_IREAD) + if not non_existing: + self.created_dirs.append(dir_path) result.append(dir_path) + print "Generated dir" + dir_path return result def setup_method(self, method): - # We are overriding this method to prevent starting Impala before each test. In this - # file, each test is responsible for doing that because we want to use class - # variables like cls.normal_dirs to generate the parameter string to - # start-impala-cluster, which are generated in setup_class (so using the with_args - # decorator does not work). - pass + # Don't call the superclass method to prevent starting Impala before each test. In + # this file, each test is responsible for doing that because we want to generate + # the parameter string to start-impala-cluster in each test method. + self.created_dirs = [] def teardown_method(self, method): - pass + for dir_path in self.created_dirs: + shutil.rmtree(dir_path, ignore_errors=True) @pytest.mark.execute_serially def test_multiple_dirs(self, vector): """ 5 empty directories are created in the /tmp directory and we verify that only one of those directories is used as scratch disk. Only one should be used as scratch because all directories are on same disk.""" + normal_dirs = self.generate_dirs(5) self._start_impala_cluster([ - '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.normal_dirs))]) + '--impalad_args="-scratch_dirs={0}"'.format(','.join(normal_dirs))]) self.assert_impalad_log_contains("INFO", "Using scratch directory ", expected_count=1) exec_option = vector.get_value('exec_option') @@ -103,10 +91,9 @@ class TestScratchDir(CustomClusterTestSuite): impalad = self.cluster.get_any_impalad() client = impalad.service.create_beeswax_client() self.execute_query_expect_success(client, self.spill_query, exec_option) - assert self.count_nonempty_dirs(self.normal_dirs) == 1 + assert self.count_nonempty_dirs(normal_dirs) == 1 @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args("-scratch_dirs=") def test_no_dirs(self, vector): """ Test we can execute a query with no scratch dirs """ self._start_impala_cluster(['--impalad_args="-scratch_dirs="']) @@ -124,8 +111,9 @@ class TestScratchDir(CustomClusterTestSuite): @pytest.mark.execute_serially def test_non_writable_dirs(self, vector): """ Test we can execute a query with only bad non-writable scratch """ + non_writable_dirs = self.generate_dirs(5, writable=False) self._start_impala_cluster([ - '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_writable_dirs))]) + '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_writable_dirs))]) self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could " + "not use any scratch directories in list:.*. See previous " + "warnings for information on causes.") @@ -139,13 +127,14 @@ class TestScratchDir(CustomClusterTestSuite): self.execute_query_expect_failure(client, self.spill_query, exec_option) # Should be able to execute in-memory query self.execute_query_expect_success(client, self.in_mem_query, exec_option) - assert self.count_nonempty_dirs(self.non_writable_dirs) == 0 + assert self.count_nonempty_dirs(non_writable_dirs) == 0 @pytest.mark.execute_serially def test_non_existing_dirs(self, vector): """ Test that non-existing directories are not created or used """ + non_existing_dirs = self.generate_dirs(5, non_existing=True) self._start_impala_cluster([ - '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_existing_dirs))]) + '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_existing_dirs))]) self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could " + "not use any scratch directories in list:.*. See previous " + "warnings for information on causes.") @@ -160,4 +149,27 @@ class TestScratchDir(CustomClusterTestSuite): self.execute_query_expect_failure(client, self.spill_query, exec_option) # Should be able to execute in-memory query self.execute_query_expect_success(client, self.in_mem_query, exec_option) - assert self.count_nonempty_dirs(self.non_existing_dirs) == 0 + assert self.count_nonempty_dirs(non_existing_dirs) == 0 + + @pytest.mark.execute_serially + def test_write_error_failover(self, vector): + """ Test that we can fail-over to writable directories if other directories + have permissions changed or are removed after impalad startup.""" + dirs = self.generate_dirs(3); + self._start_impala_cluster([ + '--impalad_args="-scratch_dirs={0}"'.format(','.join(dirs)), + '--impalad_args=--allow_multiple_scratch_dirs_per_device=true']) + self.assert_impalad_log_contains("INFO", "Using scratch directory ", + expected_count=len(dirs)) + exec_option = vector.get_value('exec_option') + exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory + # Trigger errors when writing the first two directories. + shutil.rmtree(dirs[0]) # Remove the first directory. + # Make all subdirectories in the second directory non-writable. + for dirpath, dirnames, filenames in os.walk(dirs[1]): + os.chmod(dirpath, stat.S_IREAD) + + # Should still be able to spill to the third directory. + impalad = self.cluster.get_any_impalad() + client = impalad.service.create_beeswax_client() + self.execute_query_expect_success(client, self.spill_query, exec_option)
