Repository: incubator-impala Updated Branches: refs/heads/master 2aa86309d -> 1522da351
IMPALA-2988: Refactor HdfsTableSink::Close() so that it cannot fail HdfsTableSink::Close() makes calls to functions that can fail with a Status. However, since the function has a void return type, these error statuses are just logged and we cannot take any action according to the type of error. This patch moves the closing of the partition file into the FlushFinal function from Close(), so that in case of an error on closing the file, the error is propagated up and some action can be taken. We try and close all the partition files in the map in Close() as well because if a query is cancelled, FlushFinal will not be called and we would end up leaking some file descriptors. Also fixed some long lines in this patch. Change-Id: I2546bc68ba136b2713d744c1b920878606a2217b Reviewed-on: http://gerrit.cloudera.org:8080/4018 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/02608f89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/02608f89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/02608f89 Branch: refs/heads/master Commit: 02608f89f5ee3385ef63533eb012cb75690d0b5d Parents: 2aa8630 Author: Sailesh Mukil <[email protected]> Authored: Wed Mar 30 21:58:48 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sat Aug 20 01:53:24 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-table-sink.cc | 22 ++++++++++++---------- be/src/exec/hdfs-table-sink.h | 6 +++--- 2 files changed, 15 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/02608f89/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 0626bd4..fb547b4 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -340,7 +340,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, output_partition->num_rows = 0; Status status = output_partition->writer->InitNewFile(); if (!status.ok()) { - ClosePartitionFile(state, output_partition); + status.MergeStatus(ClosePartitionFile(state, output_partition)); hdfsDelete(output_partition->hdfs_connection, output_partition->current_file_name.c_str(), 0); } @@ -608,24 +608,27 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats); } - ClosePartitionFile(state, partition); + RETURN_IF_ERROR(ClosePartitionFile(state, partition)); return Status::OK(); } -void HdfsTableSink::ClosePartitionFile(RuntimeState* state, OutputPartition* partition) { - if (partition->tmp_hdfs_file == NULL) return; +Status HdfsTableSink::ClosePartitionFile( + RuntimeState* state, OutputPartition* partition) { + if (partition->tmp_hdfs_file == NULL) return Status::OK(); int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, partition->tmp_hdfs_file); VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name; + partition->tmp_hdfs_file = NULL; + ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1); if (hdfs_ret != 0) { - state->LogError(ErrorMsg(TErrorCode::GENERAL, + return Status(ErrorMsg(TErrorCode::GENERAL, GetHdfsErrorMsg("Failed to close HDFS file: ", partition->current_file_name))); } - partition->tmp_hdfs_file = NULL; - ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1); + return Status::OK(); } Status HdfsTableSink::FlushFinal(RuntimeState* state) { + DCHECK(!closed_); SCOPED_TIMER(profile()->total_time_counter()); if (dynamic_partition_key_expr_ctxs_.empty()) { @@ -643,8 +646,6 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) { RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first)); } - // TODO: Move call to ClosePartitionFile() here so that the error status can be - // propagated. If closing the file fails, the query should fail. return Status::OK(); } @@ -658,7 +659,8 @@ void HdfsTableSink::Close(RuntimeState* state) { if (cur_partition->second.first->writer.get() != NULL) { cur_partition->second.first->writer->Close(); } - ClosePartitionFile(state, cur_partition->second.first); + Status close_status = ClosePartitionFile(state, cur_partition->second.first); + if (!close_status.ok()) state->LogError(close_status.msg()); } partition_keys_to_output_partitions_.clear(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/02608f89/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index fc32f4d..bb2f9d7 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -206,12 +206,12 @@ class HdfsTableSink : public DataSink { void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor, OutputPartition* output); - /// Updates runtime stats of HDFS with rows written, then closes the file associated with - /// the partition by calling ClosePartitionFile() + /// Updates runtime stats of HDFS with rows written, then closes the file associated + /// with the partition by calling ClosePartitionFile() Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition); /// Closes the hdfs file for this partition as well as the writer. - void ClosePartitionFile(RuntimeState* state, OutputPartition* partition); + Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition); // Returns TRUE if the staging step should be skipped for this partition. This allows // for faster INSERT query completion time for the S3A filesystem as the coordinator
