Repository: incubator-impala Updated Branches: refs/heads/master 545bfcfe6 -> 157da298d
IMPALA-4899: Fix parquet table writer dictionary leak Currently, in HdfsTableSink, OutputPartitions are added to the RuntimeState object pool to be freed at the end of the query. However, for clustered inserts into a partitioned table, the OutputPartitions are only used one at a time. They can be immediately freed once done writing to that partition. In addition, the HdfsParquetTableWriter's ColumnWriters are also added to this object pool. These constitute a significant amount of memory, as they contain the dictionaries for Parquet encoding. This change makes HdfsParquetTableWriter's ColumnWriters use unique_ptrs so that they are cleaned up when the HdfsParquetTableWriter is deleted. It also uses a unique_ptr on the PartitionPair for the OutputPartition. The table writers maintain a pointer to the OutputPartition. This remains a raw pointer. This is safe, because OutputPartition has a scoped_ptr to the table writer. The table writer will never outlive the OutputPartition. Change-Id: I06e354086ad24071d4fbf823f25f5df23933688f Reviewed-on: http://gerrit.cloudera.org:8080/6181 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/642b8f1b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/642b8f1b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/642b8f1b Branch: refs/heads/master Commit: 642b8f1b5d5493dc9e3aa55a973ef92094d4dbc9 Parents: 545bfcf Author: Joe McDonnell <[email protected]> Authored: Mon Feb 27 16:13:38 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Mar 3 01:20:04 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-table-writer.cc | 2 +- be/src/exec/hdfs-parquet-table-writer.h | 6 ++++-- be/src/exec/hdfs-table-sink.cc | 18 ++++++++++-------- be/src/exec/hdfs-table-sink.h | 11 ++++++++--- be/src/exec/hdfs-table-writer.h | 3 +++ 5 files changed, 26 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 6114f4e..890aa5d 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -799,7 +799,7 @@ Status HdfsParquetTableWriter::Init() { default: DCHECK(false); } - columns_[i] = state_->obj_pool()->Add(writer); + columns_[i].reset(writer); columns_[i]->Reset(); } RETURN_IF_ERROR(CreateSchema()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-parquet-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h index 94ad932..d4fbd94 100644 --- a/be/src/exec/hdfs-parquet-table-writer.h +++ b/be/src/exec/hdfs-parquet-table-writer.h @@ -146,8 +146,10 @@ class HdfsParquetTableWriter : public HdfsTableWriter { /// The current row group being written to. parquet::RowGroup* current_row_group_; - /// array of pointers to column information. - std::vector<BaseColumnWriter*> columns_; + /// Array of pointers to column information. The column writers are owned by the + /// table writer, as there is no reason for the column writers to outlive the table + /// writer. + std::vector<std::unique_ptr<BaseColumnWriter>> columns_; /// Number of rows in current file int64_t row_count_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index ddda05b..8d64683 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -276,7 +276,7 @@ Status HdfsTableSink::WriteRowsToPartition( // set. bool new_file; while (true) { - OutputPartition* output_partition = partition_pair->first; + OutputPartition* output_partition = partition_pair->first.get(); RETURN_IF_ERROR( output_partition->writer->AppendRows(batch, partition_pair->second, &new_file)); if (!new_file) break; @@ -327,7 +327,8 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_)); current_clustered_partition_->second.clear(); } - RETURN_IF_ERROR(FinalizePartitionFile(state, current_clustered_partition_->first)); + RETURN_IF_ERROR(FinalizePartitionFile(state, + current_clustered_partition_->first.get())); if (current_clustered_partition_->first->writer.get() != nullptr) { current_clustered_partition_->first->writer->Close(); } @@ -571,9 +572,10 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple partition_descriptor = it->second; } - OutputPartition* partition = state->obj_pool()->Add(new OutputPartition()); + std::unique_ptr<OutputPartition> partition(new OutputPartition()); Status status = - InitOutputPartition(state, *partition_descriptor, row, partition, no_more_rows); + InitOutputPartition(state, *partition_descriptor, row, partition.get(), + no_more_rows); if (!status.ok()) { // We failed to create the output partition successfully. Clean it up now // as it is not added to partition_keys_to_output_partitions_ so won't be @@ -594,12 +596,12 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple state->per_partition_status()->insert( make_pair(partition->partition_name, partition_status)); - if (!no_more_rows && !ShouldSkipStaging(state, partition)) { + if (!no_more_rows && !ShouldSkipStaging(state, partition.get())) { // Indicate that temporary directory is to be deleted after execution (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = ""; } - partition_keys_to_output_partitions_[key].first = partition; + partition_keys_to_output_partitions_[key].first = std::move(partition); *partition_pair = &partition_keys_to_output_partitions_[key]; } else { // Use existing output_partition partition. @@ -701,7 +703,7 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) { partition_keys_to_output_partitions_.begin(); cur_partition != partition_keys_to_output_partitions_.end(); ++cur_partition) { - RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first)); + RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first.get())); } return Status::OK(); @@ -717,7 +719,7 @@ void HdfsTableSink::Close(RuntimeState* state) { if (cur_partition->second.first->writer.get() != nullptr) { cur_partition->second.first->writer->Close(); } - Status close_status = ClosePartitionFile(state, cur_partition->second.first); + Status close_status = ClosePartitionFile(state, cur_partition->second.first.get()); if (!close_status.ok()) state->LogError(close_status.msg()); } partition_keys_to_output_partitions_.clear(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 22b6a44..65c1798 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -179,9 +179,14 @@ class HdfsTableSink : public DataSink { Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition); /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by - /// GetHashTblKey(). Maps to an OutputPartition, which are owned by the object pool, and - /// a vector of indices of the rows in the current batch to insert into the partition. - typedef std::pair<OutputPartition*, std::vector<int32_t>> PartitionPair; + /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of the rows in + /// the current batch to insert into the partition. The PartitionPair owns the + /// OutputPartition via a unique_ptr so that the memory is freed as soon as the + /// PartitionPair is removed from the map. This is important, because the + /// PartitionPairs can have different lifetimes. For example, a clustered insert into a + /// partitioned table iterates over the partitions, so only one PartitionPairs is + /// in the map at any given time. + typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> PartitionPair; typedef boost::unordered_map<std::string, PartitionPair> PartitionMap; /// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h index 8066315..86fe56c 100644 --- a/be/src/exec/hdfs-table-writer.h +++ b/be/src/exec/hdfs-table-writer.h @@ -115,6 +115,9 @@ class HdfsTableWriter { RuntimeState* state_; /// Structure describing partition written to by this writer. + /// NOTE: OutputPartition is usually accessed with a unique_ptr. It is safe to use + /// a raw pointer here, because the OutputPartition maintains a scoped_ptr to + /// the HdfsTableWriter objects. This will never outlive the OutputPartition. OutputPartition* output_; /// Table descriptor of table to be written.
