Repository: incubator-impala
Updated Branches:
  refs/heads/master 567814b4c -> 3480c892c


IMPALA-5061: Populate null_count in parquet::statistics

The null_count in the statistics field is updated each time a null
value is encountered by parquet table writer. The value is written
to the parquet header if it has one or more null values in the
row_group.

Testing: Modified the existing end-to-end test in the
test_insert_parquet.py file to make sure each parquet header has
the appropriate null_count. Verified the correctness of the nulltable
test and added an additional test which populates a parquet file with
the functional_parquet.zipcode_incomes table and ensures that the
expected null_count is populated.

Change-Id: I4c49a63af84c2234f0633be63206cb52eb7e8ebb
Reviewed-on: http://gerrit.cloudera.org:8080/7058
Reviewed-by: Lars Volker <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6d5cd617
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6d5cd617
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6d5cd617

Branch: refs/heads/master
Commit: 6d5cd6174ee4e90a9d5dae5bedf02275d3d814a8
Parents: 567814b
Author: poojanilangekar <[email protected]>
Authored: Fri Jun 2 10:29:13 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 15 22:00:54 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc  |  14 +--
 be/src/exec/parquet-column-stats.h        |  24 +++--
 be/src/exec/parquet-column-stats.inline.h |  92 +++++++----------
 tests/query_test/test_insert_parquet.py   | 130 ++++++++++++++-----------
 4 files changed, 134 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index 72c86b2..a3aa4d3 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -141,8 +141,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // 'meta_data'.
   void EncodeRowGroupStats(ColumnMetaData* meta_data) {
     DCHECK(row_group_stats_base_ != nullptr);
-    if (row_group_stats_base_->has_values()
-        && row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
+    if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
       row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
       meta_data->__isset.statistics = true;
     }
@@ -466,8 +465,12 @@ inline Status 
HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   // TODO: Have a clearer set of state transitions here, to make it easier to 
see that
   // this won't loop forever.
   while (true) {
-    // Nulls don't get encoded.
-    if (value == nullptr) break;
+    // Nulls don't get encoded. Increment the null count of the parquet 
statistics.
+    if (value == nullptr) {
+      DCHECK(page_stats_base_ != nullptr);
+      page_stats_base_->IncrementNullCount(1);
+      break;
+    }
 
     int64_t bytes_needed = 0;
     if (ProcessValue(value, &bytes_needed)) {
@@ -679,8 +682,7 @@ void 
HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
 
   // Build page statistics and add them to the header.
   DCHECK(page_stats_base_ != nullptr);
-  if (page_stats_base_->has_values()
-      && page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
+  if (page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
     page_stats_base_->EncodeToThrift(&header.data_page_header.statistics);
     header.data_page_header.__isset.statistics = true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h 
b/be/src/exec/parquet-column-stats.h
index 53fe548..c068312 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -62,7 +62,7 @@ class ColumnStatsBase {
   /// the minimum or maximum value.
   enum class StatsField { MIN, MAX };
 
-  ColumnStatsBase() : has_values_(false) {}
+  ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
   virtual ~ColumnStatsBase() {}
 
   /// Decodes the parquet::Statistics from 'col_chunk' and writes the value 
selected by
@@ -91,17 +91,22 @@ class ColumnStatsBase {
   virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
 
   /// Resets the state of this object.
-  void Reset() { has_values_ = false; }
+  void Reset();
 
-  bool has_values() const { return has_values_; }
+  /// Update the statistics by incrementing the null_count. It is called each 
time a null
+  /// value is appended to the column or the statistics are merged.
+  void IncrementNullCount(int64_t count) { null_count_ += count; }
 
  protected:
   // Copies the memory of 'value' into 'buffer' and make 'value' point to 
'buffer'.
   // 'buffer' is reset before making the copy.
   static void CopyToBuffer(StringBuffer* buffer, StringValue* value);
 
-  /// Stores whether the current object has been initialized with a set of 
values.
-  bool has_values_;
+  /// Stores whether the min and max values of the current object have been 
initialized.
+  bool has_min_max_values_;
+
+  // Number of null values since the last call to Reset().
+  int64_t null_count_;
 
  private:
   /// Returns true if we support reading statistics stored in the fields 
'min_value' and
@@ -149,10 +154,13 @@ class ColumnStats : public ColumnStatsBase {
       min_buffer_(mem_pool),
       max_buffer_(mem_pool) {}
 
-  /// Updates the statistics based on the value 'v'. If necessary, initializes 
the
-  /// statistics. It may keep a reference to 'v' until
+  /// Updates the statistics based on the values min_value and max_value. If 
necessary,
+  /// initializes the statistics. It may keep a reference to either value until
   /// MaterializeStringValuesToInternalBuffers() gets called.
-  void Update(const T& v);
+  void Update(const T& min_value, const T& max_value);
+
+  /// Wrapper to call the Update function which takes in the min_value and 
max_value.
+  void Update(const T& v) { Update(v, v); }
 
   virtual void Merge(const ColumnStatsBase& other) override;
   virtual void MaterializeStringValuesToInternalBuffers() override {}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h 
b/be/src/exec/parquet-column-stats.inline.h
index 48c1b2e..8681436 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -24,15 +24,20 @@
 
 namespace impala {
 
+inline void ColumnStatsBase::Reset() {
+  has_min_max_values_ = false;
+  null_count_ = 0;
+}
+
 template <typename T>
-inline void ColumnStats<T>::Update(const T& v) {
-  if (!has_values_) {
-    has_values_ = true;
-    min_value_ = v;
-    max_value_ = v;
+inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) {
+  if (!has_min_max_values_) {
+    has_min_max_values_ = true;
+    min_value_ = min_value;
+    max_value_ = max_value;
   } else {
-    min_value_ = std::min(min_value_, v);
-    max_value_ = std::max(max_value_, v);
+    min_value_ = std::min(min_value_, min_value);
+    max_value_ = std::max(max_value_, max_value);
   }
 }
 
@@ -40,31 +45,27 @@ template <typename T>
 inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
   DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
   const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
-  if (!cs->has_values_) return;
-  if (!has_values_) {
-    has_values_ = true;
-    min_value_ = cs->min_value_;
-    max_value_ = cs->max_value_;
-  } else {
-    min_value_ = std::min(min_value_, cs->min_value_);
-    max_value_ = std::max(max_value_, cs->max_value_);
-  }
+  if (cs->has_min_max_values_) Update(cs->min_value_, cs->max_value_);
+  IncrementNullCount(cs->null_count_);
 }
 
 template <typename T>
 inline int64_t ColumnStats<T>::BytesNeeded() const {
-  return BytesNeeded(min_value_) + BytesNeeded(max_value_);
+  return BytesNeeded(min_value_) + BytesNeeded(max_value_)
+      + ParquetPlainEncoder::ByteSize(null_count_);
 }
 
 template <typename T>
 inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const {
-  DCHECK(has_values_);
-  std::string min_str;
-  EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str);
-  out->__set_min_value(move(min_str));
-  std::string max_str;
-  EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str);
-  out->__set_max_value(move(max_str));
+  if (has_min_max_values_) {
+    std::string min_str;
+    EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str);
+    out->__set_min_value(move(min_str));
+    std::string max_str;
+    EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str);
+    out->__set_max_value(move(max_str));
+  }
+  out->__set_null_count(null_count_);
 }
 
 template <typename T>
@@ -145,44 +146,21 @@ inline bool ColumnStats<StringValue>::DecodePlainValue(
 }
 
 template <>
-inline void ColumnStats<StringValue>::Update(const StringValue& v) {
-  if (!has_values_) {
-    has_values_ = true;
-    min_value_ = v;
-    min_buffer_.Clear();
-    max_value_ = v;
-    max_buffer_.Clear();
-  } else {
-    if (v < min_value_) {
-      min_value_ = v;
-      min_buffer_.Clear();
-    }
-    if (v > max_value_) {
-      max_value_ = v;
-      max_buffer_.Clear();
-    }
-  }
-}
-
-template <>
-inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) {
-  DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other));
-  const ColumnStats<StringValue>* cs =
-      static_cast<const ColumnStats<StringValue>*>(&other);
-  if (!cs->has_values_) return;
-  if (!has_values_) {
-    has_values_ = true;
-    min_value_ = cs->min_value_;
+inline void ColumnStats<StringValue>::Update(
+    const StringValue& min_value, const StringValue& max_value) {
+  if (!has_min_max_values_) {
+    has_min_max_values_ = true;
+    min_value_ = min_value;
     min_buffer_.Clear();
-    max_value_ = cs->max_value_;
+    max_value_ = max_value;
     max_buffer_.Clear();
   } else {
-    if (cs->min_value_ < min_value_) {
-      min_value_ = cs->min_value_;
+    if (min_value < min_value_) {
+      min_value_ = min_value;
       min_buffer_.Clear();
     }
-    if (cs->max_value_ > max_value_) {
-      max_value_ = cs->max_value_;
+    if (max_value > max_value_) {
+      max_value_ = max_value;
       max_buffer_.Clear();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py 
b/tests/query_test/test_insert_parquet.py
index c19363f..908e50a 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -67,7 +67,7 @@ class TimeStamp():
     return self.timetuple == other_timetuple
 
 
-ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max'])
+ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max', 'null_count'])
 
 # Test a smaller parquet file size as well
 # TODO: these tests take a while so we don't want to go through too many sizes 
but
@@ -319,15 +319,17 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
       if stats is None:
         decoded.append(None)
         continue
+      min_value = None
+      max_value = None
 
-      if stats.min_value is None and stats.max_value is None:
-        decoded.append(None)
-        continue
+      if stats.min_value is not None and stats.max_value is not None:
+        min_value = decode_stats_value(schema, stats.min_value)
+        max_value = decode_stats_value(schema, stats.max_value)
+
+      null_count = stats.null_count
+      assert null_count is not None
 
-      assert stats.min_value is not None and stats.max_value is not None
-      min_value = decode_stats_value(schema, stats.min_value)
-      max_value = decode_stats_value(schema, stats.max_value)
-      decoded.append(ColumnStats(schema.name, min_value, max_value))
+      decoded.append(ColumnStats(schema.name, min_value, max_value, 
null_count))
 
     assert len(decoded) == len(schemas)
     return decoded
@@ -367,7 +369,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
 
     return row_group_stats
 
-  def _validate_min_max_stats(self, hdfs_path, expected_values, skip_col_idxs 
= None):
+  def _validate_parquet_stats(self, hdfs_path, expected_values, skip_col_idxs 
= None):
     """Validates that 'hdfs_path' contains exactly one parquet file and that 
the rowgroup
     statistics in that file match the values in 'expected_values'. Columns 
indexed by
     'skip_col_idx' are excluded from the verification of the expected values.
@@ -408,7 +410,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         qualified_table_name, source_table)
     vector.get_value('exec_option')['num_nodes'] = 1
     self.execute_query(query, vector.get_value('exec_option'))
-    self._validate_min_max_stats(hdfs_path, expected_values)
+    self._validate_parquet_stats(hdfs_path, expected_values)
 
   def test_write_statistics_alltypes(self, vector, unique_database):
     """Test that writing a parquet file populates the rowgroup statistics with 
the correct
@@ -416,20 +418,20 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     """
     # Expected values for functional.alltypes
     expected_min_max_values = [
-        ColumnStats('id', 0, 7299),
-        ColumnStats('bool_col', False, True),
-        ColumnStats('tinyint_col', 0, 9),
-        ColumnStats('smallint_col', 0, 9),
-        ColumnStats('int_col', 0, 9),
-        ColumnStats('bigint_col', 0, 90),
-        ColumnStats('float_col', 0, RoundFloat(9.9, 1)),
-        ColumnStats('double_col', 0, RoundFloat(90.9, 1)),
-        ColumnStats('date_string_col', '01/01/09', '12/31/10'),
-        ColumnStats('string_col', '0', '9'),
+        ColumnStats('id', 0, 7299, 0),
+        ColumnStats('bool_col', False, True, 0),
+        ColumnStats('tinyint_col', 0, 9, 0),
+        ColumnStats('smallint_col', 0, 9, 0),
+        ColumnStats('int_col', 0, 9, 0),
+        ColumnStats('bigint_col', 0, 90, 0),
+        ColumnStats('float_col', 0, RoundFloat(9.9, 1), 0),
+        ColumnStats('double_col', 0, RoundFloat(90.9, 1), 0),
+        ColumnStats('date_string_col', '01/01/09', '12/31/10', 0),
+        ColumnStats('string_col', '0', '9', 0),
         ColumnStats('timestamp_col', TimeStamp('2009-01-01 00:00:00.0'),
-                    TimeStamp('2010-12-31 05:09:13.860000')),
-        ColumnStats('year', 2009, 2010),
-        ColumnStats('month', 1, 12),
+                    TimeStamp('2010-12-31 05:09:13.860000'), 0),
+        ColumnStats('year', 2009, 2010, 0),
+        ColumnStats('month', 1, 12, 0),
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, 
"functional.alltypes",
@@ -441,12 +443,12 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     """
     # Expected values for functional.decimal_tbl
     expected_min_max_values = [
-      ColumnStats('d1', 1234, 132842),
-      ColumnStats('d2', 111, 2222),
-      ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789')),
-      ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789')),
-      ColumnStats('d5', Decimal('0.1'), Decimal('12345.789')),
-      ColumnStats('d6', 1, 1)
+      ColumnStats('d1', 1234, 132842, 0),
+      ColumnStats('d2', 111, 2222, 0),
+      ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789'), 0),
+      ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789'), 0),
+      ColumnStats('d5', Decimal('0.1'), Decimal('12345.789'), 0),
+      ColumnStats('d6', 1, 1, 0)
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, 
"functional.decimal_tbl",
@@ -458,31 +460,32 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     """
     # Expected values for tpch_parquet.customer
     expected_min_max_values = [
-        ColumnStats('c_custkey', 1, 150000),
-        ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000'),
-        ColumnStats('c_address', '   2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ'),
-        ColumnStats('c_nationkey', 0, 24),
-        ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881'),
-        ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99')),
-        ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY'),
+        ColumnStats('c_custkey', 1, 150000, 0),
+        ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000', 0),
+        ColumnStats('c_address', '   2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ', 
0),
+        ColumnStats('c_nationkey', 0, 24, 0),
+        ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881', 0),
+        ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99'), 0),
+        ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY', 0),
         ColumnStats('c_comment', ' Tiresias according to the slyly blithe 
instructions '
                     'detect quickly at the slyly express courts. express dinos 
wake ',
-                    'zzle. blithely regular instructions cajol'),
+                    'zzle. blithely regular instructions cajol', 0),
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, 
"tpch_parquet.customer",
                                       expected_min_max_values)
 
   def test_write_statistics_null(self, vector, unique_database):
-    """Test that we don't write min/max statistics for null columns."""
+    """Test that we don't write min/max statistics for null columns. Ensure 
null_count
+    is set for columns with null values."""
     expected_min_max_values = [
-        ColumnStats('a', 'a', 'a'),
-        ColumnStats('b', '', ''),
-        None,
-        None,
-        None,
-        ColumnStats('f', 'a\x00b', 'a\x00b'),
-        ColumnStats('g', '\x00', '\x00')
+        ColumnStats('a', 'a', 'a', 0),
+        ColumnStats('b', '', '', 0),
+        ColumnStats('c', None, None, 1),
+        ColumnStats('d', None, None, 1),
+        ColumnStats('e', None, None, 1),
+        ColumnStats('f', 'a\x00b', 'a\x00b', 0),
+        ColumnStats('g', '\x00', '\x00', 0)
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, 
"functional.nulltable",
@@ -503,9 +506,9 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         (cast("xy" as char(3)), "abc banana", "dolor dis 
amet")""".format(qualified_table_name)
     self.execute_query(insert_stmt)
     expected_min_max_values = [
-        ColumnStats('c3', 'abc', 'xy'),
-        ColumnStats('vc', 'abc banana', 'ghj xyz'),
-        ColumnStats('st', 'abc xyz', 'lorem ipsum')
+        ColumnStats('c3', 'abc', 'xy', 0),
+        ColumnStats('vc', 'abc banana', 'ghj xyz', 0),
+        ColumnStats('st', 'abc xyz', 'lorem ipsum', 0)
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, 
qualified_table_name,
@@ -528,11 +531,11 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     self.execute_query(create_view_stmt)
 
     expected_min_max_values = [
-        ColumnStats('id', -7299, 7298),
-        ColumnStats('int_col', -9, 8),
-        ColumnStats('bigint_col', -90, 80),
-        ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1)),
-        ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1)),
+        ColumnStats('id', -7299, 7298, 0),
+        ColumnStats('int_col', -9, 8, 0),
+        ColumnStats('bigint_col', -90, 80, 0),
+        ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1), 0),
+        ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1), 
0),
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, 
qualified_view_name,
@@ -586,9 +589,26 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     self.execute_query(insert_stmt)
 
     expected_min_max_values = [
-        ColumnStats('f', float('-inf'), float('inf')),
-        ColumnStats('d', float('-inf'), float('inf')),
+        ColumnStats('f', float('-inf'), float('inf'), 0),
+        ColumnStats('d', float('-inf'), float('inf'), 0),
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, 
qualified_table_name,
                                       expected_min_max_values)
+
+  def test_write_null_count_statistics(self, vector, unique_database):
+    """Test that writing a parquet file populates the rowgroup statistics with 
the correct
+    null_count. This test ensures that the null_count is correct for a table 
with multiple
+    null values."""
+
+    # Expected values for tpch_parquet.customer
+    expected_min_max_values = [
+      ColumnStats('id', '8600000US00601', '8600000US999XX', 0),
+      ColumnStats('zip', '00601', '999XX', 0),
+      ColumnStats('description1', '\"00601 5-Digit ZCTA', '\"999XX 5-Digit 
ZCTA', 0),
+      ColumnStats('description2', ' 006 3-Digit ZCTA\"', ' 999 3-Digit 
ZCTA\"', 0),
+      ColumnStats('income', 0, 189570, 29),
+    ]
+
+    self._ctas_table_and_verify_stats(vector, unique_database,
+      "functional_parquet.zipcode_incomes", expected_min_max_values)

Reply via email to