IMPALA-5115: Handle status from HdfsTableSink::WriteClusteredRowBatch Change-Id: Id6f5d87ae5e46d6019d9988011d3f2657e81ae15 Reviewed-on: http://gerrit.cloudera.org:8080/6486 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8e45c4ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8e45c4ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8e45c4ae Branch: refs/heads/master Commit: 8e45c4ae9c65f316072e395602648fe36081377a Parents: 077c07e Author: Lars Volker <[email protected]> Authored: Sat Mar 25 17:50:09 2017 +0100 Committer: Impala Public Jenkins <[email protected]> Committed: Sun Mar 26 15:36:44 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-table-sink.cc | 2 +- be/src/exec/hdfs-table-sink.h | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8e45c4ae/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 4aa09cd..ef983e2 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -627,7 +627,7 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, &partition_pair, false)); RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair)); } else if (input_is_clustered_) { - WriteClusteredRowBatch(state, batch); + RETURN_IF_ERROR(WriteClusteredRowBatch(state, batch)); } else { for (int i = 0; i < batch->num_rows(); ++i) { const TupleRow* current_row = batch->GetRow(i); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8e45c4ae/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 45d64c5..d452f3d 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -169,14 +169,15 @@ class HdfsTableSink : public DataSink { /// added to it, empty_partition must be true. Status InitOutputPartition(RuntimeState* state, const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row, - OutputPartition* output_partition, bool empty_partition); + OutputPartition* output_partition, bool empty_partition) WARN_UNUSED_RESULT; /// Add a temporary file to an output partition. Files are created in a /// temporary directory and then moved to the real partition directory by the /// coordinator in a finalization step. The temporary file's current location /// and final destination are recorded in the state parameter. /// If this function fails, the tmp file is cleaned up. - Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition); + Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition) + WARN_UNUSED_RESULT; /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of the rows in @@ -199,10 +200,11 @@ class HdfsTableSink : public DataSink { /// the 'partition_keys_to_output_partitions_'. 'no_more_rows' indicates that no more /// rows will be added to the partition. Status GetOutputPartition(RuntimeState* state, const TupleRow* row, - const std::string& key, PartitionPair** partition_pair, bool no_more_rows); + const std::string& key, PartitionPair** partition_pair, bool no_more_rows) + WARN_UNUSED_RESULT; /// Initialise and prepare select and partition key expressions - Status PrepareExprs(RuntimeState* state); + Status PrepareExprs(RuntimeState* state) WARN_UNUSED_RESULT; /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition. /// The Hdfs directory is created from the target table's base Hdfs dir, @@ -214,18 +216,21 @@ class HdfsTableSink : public DataSink { /// Writes all rows referenced by the row index vector in 'partition_pair' to the /// partition's writer and clears the row index vector afterwards. Status WriteRowsToPartition( - RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair); + RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) + WARN_UNUSED_RESULT; /// Maps all rows in 'batch' to partitions and appends them to their temporary Hdfs /// files. The input must be ordered by the partition key expressions. - Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch); + Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT; /// Updates runtime stats of HDFS with rows written, then closes the file associated /// with the partition by calling ClosePartitionFile() - Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition); + Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition) + WARN_UNUSED_RESULT; /// Closes the hdfs file for this partition as well as the writer. - Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition); + Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition) + WARN_UNUSED_RESULT; // Returns TRUE if the staging step should be skipped for this partition. This allows // for faster INSERT query completion time for the S3A filesystem as the coordinator
