Repository: incubator-impala
Updated Branches:
  refs/heads/master 545bfcfe6 -> 157da298d


IMPALA-4899: Fix parquet table writer dictionary leak

Currently, in HdfsTableSink, OutputPartitions are added to the RuntimeState
object pool to be freed at the end of the query. However, for clustered inserts
into a partitioned table, the OutputPartitions are only used one at a time.
They can be immediately freed once done writing to that partition.

In addition, the HdfsParquetTableWriter's ColumnWriters are also added to
this object pool. These constitute a significant amount of memory, as they
contain the dictionaries for Parquet encoding.

This change makes HdfsParquetTableWriter's ColumnWriters use unique_ptrs so
that they are cleaned up when the HdfsParquetTableWriter is deleted. It also
uses a unique_ptr on the PartitionPair for the OutputPartition.

The table writers maintain a pointer to the OutputPartition. This remains a
raw pointer. This is safe, because OutputPartition has a scoped_ptr to the
table writer. The table writer will never outlive the OutputPartition.

Change-Id: I06e354086ad24071d4fbf823f25f5df23933688f
Reviewed-on: http://gerrit.cloudera.org:8080/6181
Reviewed-by: Marcel Kornacker <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/642b8f1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/642b8f1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/642b8f1b

Branch: refs/heads/master
Commit: 642b8f1b5d5493dc9e3aa55a973ef92094d4dbc9
Parents: 545bfcf
Author: Joe McDonnell <[email protected]>
Authored: Mon Feb 27 16:13:38 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Mar 3 01:20:04 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc |  2 +-
 be/src/exec/hdfs-parquet-table-writer.h  |  6 ++++--
 be/src/exec/hdfs-table-sink.cc           | 18 ++++++++++--------
 be/src/exec/hdfs-table-sink.h            | 11 ++++++++---
 be/src/exec/hdfs-table-writer.h          |  3 +++
 5 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index 6114f4e..890aa5d 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -799,7 +799,7 @@ Status HdfsParquetTableWriter::Init() {
       default:
         DCHECK(false);
     }
-    columns_[i] = state_->obj_pool()->Add(writer);
+    columns_[i].reset(writer);
     columns_[i]->Reset();
   }
   RETURN_IF_ERROR(CreateSchema());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h 
b/be/src/exec/hdfs-parquet-table-writer.h
index 94ad932..d4fbd94 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -146,8 +146,10 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// The current row group being written to.
   parquet::RowGroup* current_row_group_;
 
-  /// array of pointers to column information.
-  std::vector<BaseColumnWriter*> columns_;
+  /// Array of pointers to column information. The column writers are owned by 
the
+  /// table writer, as there is no reason for the column writers to outlive 
the table
+  /// writer.
+  std::vector<std::unique_ptr<BaseColumnWriter>> columns_;
 
   /// Number of rows in current file
   int64_t row_count_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index ddda05b..8d64683 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -276,7 +276,7 @@ Status HdfsTableSink::WriteRowsToPartition(
   // set.
   bool new_file;
   while (true) {
-    OutputPartition* output_partition = partition_pair->first;
+    OutputPartition* output_partition = partition_pair->first.get();
     RETURN_IF_ERROR(
         output_partition->writer->AppendRows(batch, partition_pair->second, 
&new_file));
     if (!new_file) break;
@@ -327,7 +327,8 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* 
state, RowBatch* batc
         RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_clustered_partition_));
         current_clustered_partition_->second.clear();
       }
-      RETURN_IF_ERROR(FinalizePartitionFile(state, 
current_clustered_partition_->first));
+      RETURN_IF_ERROR(FinalizePartitionFile(state,
+          current_clustered_partition_->first.get()));
       if (current_clustered_partition_->first->writer.get() != nullptr) {
         current_clustered_partition_->first->writer->Close();
       }
@@ -571,9 +572,10 @@ inline Status 
HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
       partition_descriptor = it->second;
     }
 
-    OutputPartition* partition = state->obj_pool()->Add(new OutputPartition());
+    std::unique_ptr<OutputPartition> partition(new OutputPartition());
     Status status =
-        InitOutputPartition(state, *partition_descriptor, row, partition, 
no_more_rows);
+        InitOutputPartition(state, *partition_descriptor, row, partition.get(),
+            no_more_rows);
     if (!status.ok()) {
       // We failed to create the output partition successfully. Clean it up now
       // as it is not added to partition_keys_to_output_partitions_ so won't be
@@ -594,12 +596,12 @@ inline Status 
HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
     state->per_partition_status()->insert(
         make_pair(partition->partition_name, partition_status));
 
-    if (!no_more_rows && !ShouldSkipStaging(state, partition)) {
+    if (!no_more_rows && !ShouldSkipStaging(state, partition.get())) {
       // Indicate that temporary directory is to be deleted after execution
       (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = "";
     }
 
-    partition_keys_to_output_partitions_[key].first = partition;
+    partition_keys_to_output_partitions_[key].first = std::move(partition);
     *partition_pair = &partition_keys_to_output_partitions_[key];
   } else {
     // Use existing output_partition partition.
@@ -701,7 +703,7 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) {
           partition_keys_to_output_partitions_.begin();
       cur_partition != partition_keys_to_output_partitions_.end();
       ++cur_partition) {
-    RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first));
+    RETURN_IF_ERROR(FinalizePartitionFile(state, 
cur_partition->second.first.get()));
   }
 
   return Status::OK();
@@ -717,7 +719,7 @@ void HdfsTableSink::Close(RuntimeState* state) {
     if (cur_partition->second.first->writer.get() != nullptr) {
       cur_partition->second.first->writer->Close();
     }
-    Status close_status = ClosePartitionFile(state, 
cur_partition->second.first);
+    Status close_status = ClosePartitionFile(state, 
cur_partition->second.first.get());
     if (!close_status.ok()) state->LogError(close_status.msg());
   }
   partition_keys_to_output_partitions_.clear();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 22b6a44..65c1798 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -179,9 +179,14 @@ class HdfsTableSink : public DataSink {
   Status CreateNewTmpFile(RuntimeState* state, OutputPartition* 
output_partition);
 
   /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ 
generated by
-  /// GetHashTblKey(). Maps to an OutputPartition, which are owned by the 
object pool, and
-  /// a vector of indices of the rows in the current batch to insert into the 
partition.
-  typedef std::pair<OutputPartition*, std::vector<int32_t>> PartitionPair;
+  /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of 
the rows in
+  /// the current batch to insert into the partition. The PartitionPair owns 
the
+  /// OutputPartition via a unique_ptr so that the memory is freed as soon as 
the
+  /// PartitionPair is removed from the map. This is important, because the
+  /// PartitionPairs can have different lifetimes. For example, a clustered 
insert into a
+  /// partitioned table iterates over the partitions, so only one 
PartitionPairs is
+  /// in the map at any given time.
+  typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> 
PartitionPair;
   typedef boost::unordered_map<std::string, PartitionPair> PartitionMap;
 
   /// Generates string key for hash_tbl_ as a concatenation of all evaluated 
exprs,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/642b8f1b/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index 8066315..86fe56c 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -115,6 +115,9 @@ class HdfsTableWriter {
   RuntimeState* state_;
 
   /// Structure describing partition written to by this writer.
+  /// NOTE: OutputPartition is usually accessed with a unique_ptr. It is safe 
to use
+  /// a raw pointer here, because the OutputPartition maintains a scoped_ptr to
+  /// the HdfsTableWriter objects. This will never outlive the OutputPartition.
   OutputPartition* output_;
 
   /// Table descriptor of table to be written.

Reply via email to