Repository: incubator-impala Updated Branches: refs/heads/master 567814b4c -> 3480c892c
IMPALA-5061: Populate null_count in parquet::statistics The null_count in the statistics field is updated each time a null value is encountered by parquet table writer. The value is written to the parquet header if it has one or more null values in the row_group. Testing: Modified the existing end-to-end test in the test_insert_parquet.py file to make sure each parquet header has the appropriate null_count. Verified the correctness of the nulltable test and added an additional test which populates a parquet file with the functional_parquet.zipcode_incomes table and ensures that the expected null_count is populated. Change-Id: I4c49a63af84c2234f0633be63206cb52eb7e8ebb Reviewed-on: http://gerrit.cloudera.org:8080/7058 Reviewed-by: Lars Volker <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6d5cd617 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6d5cd617 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6d5cd617 Branch: refs/heads/master Commit: 6d5cd6174ee4e90a9d5dae5bedf02275d3d814a8 Parents: 567814b Author: poojanilangekar <[email protected]> Authored: Fri Jun 2 10:29:13 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 15 22:00:54 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-table-writer.cc | 14 +-- be/src/exec/parquet-column-stats.h | 24 +++-- be/src/exec/parquet-column-stats.inline.h | 92 +++++++---------- tests/query_test/test_insert_parquet.py | 130 ++++++++++++++----------- 4 files changed, 134 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 72c86b2..a3aa4d3 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -141,8 +141,7 @@ class HdfsParquetTableWriter::BaseColumnWriter { // 'meta_data'. void EncodeRowGroupStats(ColumnMetaData* meta_data) { DCHECK(row_group_stats_base_ != nullptr); - if (row_group_stats_base_->has_values() - && row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { + if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { row_group_stats_base_->EncodeToThrift(&meta_data->statistics); meta_data->__isset.statistics = true; } @@ -466,8 +465,12 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) // TODO: Have a clearer set of state transitions here, to make it easier to see that // this won't loop forever. while (true) { - // Nulls don't get encoded. - if (value == nullptr) break; + // Nulls don't get encoded. Increment the null count of the parquet statistics. + if (value == nullptr) { + DCHECK(page_stats_base_ != nullptr); + page_stats_base_->IncrementNullCount(1); + break; + } int64_t bytes_needed = 0; if (ProcessValue(value, &bytes_needed)) { @@ -679,8 +682,7 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { // Build page statistics and add them to the header. DCHECK(page_stats_base_ != nullptr); - if (page_stats_base_->has_values() - && page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { + if (page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { page_stats_base_->EncodeToThrift(&header.data_page_header.statistics); header.data_page_header.__isset.statistics = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/be/src/exec/parquet-column-stats.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h index 53fe548..c068312 100644 --- a/be/src/exec/parquet-column-stats.h +++ b/be/src/exec/parquet-column-stats.h @@ -62,7 +62,7 @@ class ColumnStatsBase { /// the minimum or maximum value. enum class StatsField { MIN, MAX }; - ColumnStatsBase() : has_values_(false) {} + ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {} virtual ~ColumnStatsBase() {} /// Decodes the parquet::Statistics from 'col_chunk' and writes the value selected by @@ -91,17 +91,22 @@ class ColumnStatsBase { virtual void EncodeToThrift(parquet::Statistics* out) const = 0; /// Resets the state of this object. - void Reset() { has_values_ = false; } + void Reset(); - bool has_values() const { return has_values_; } + /// Update the statistics by incrementing the null_count. It is called each time a null + /// value is appended to the column or the statistics are merged. + void IncrementNullCount(int64_t count) { null_count_ += count; } protected: // Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'. // 'buffer' is reset before making the copy. static void CopyToBuffer(StringBuffer* buffer, StringValue* value); - /// Stores whether the current object has been initialized with a set of values. - bool has_values_; + /// Stores whether the min and max values of the current object have been initialized. + bool has_min_max_values_; + + // Number of null values since the last call to Reset(). + int64_t null_count_; private: /// Returns true if we support reading statistics stored in the fields 'min_value' and @@ -149,10 +154,13 @@ class ColumnStats : public ColumnStatsBase { min_buffer_(mem_pool), max_buffer_(mem_pool) {} - /// Updates the statistics based on the value 'v'. If necessary, initializes the - /// statistics. It may keep a reference to 'v' until + /// Updates the statistics based on the values min_value and max_value. If necessary, + /// initializes the statistics. It may keep a reference to either value until /// MaterializeStringValuesToInternalBuffers() gets called. - void Update(const T& v); + void Update(const T& min_value, const T& max_value); + + /// Wrapper to call the Update function which takes in the min_value and max_value. + void Update(const T& v) { Update(v, v); } virtual void Merge(const ColumnStatsBase& other) override; virtual void MaterializeStringValuesToInternalBuffers() override {} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/be/src/exec/parquet-column-stats.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h index 48c1b2e..8681436 100644 --- a/be/src/exec/parquet-column-stats.inline.h +++ b/be/src/exec/parquet-column-stats.inline.h @@ -24,15 +24,20 @@ namespace impala { +inline void ColumnStatsBase::Reset() { + has_min_max_values_ = false; + null_count_ = 0; +} + template <typename T> -inline void ColumnStats<T>::Update(const T& v) { - if (!has_values_) { - has_values_ = true; - min_value_ = v; - max_value_ = v; +inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) { + if (!has_min_max_values_) { + has_min_max_values_ = true; + min_value_ = min_value; + max_value_ = max_value; } else { - min_value_ = std::min(min_value_, v); - max_value_ = std::max(max_value_, v); + min_value_ = std::min(min_value_, min_value); + max_value_ = std::max(max_value_, max_value); } } @@ -40,31 +45,27 @@ template <typename T> inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) { DCHECK(dynamic_cast<const ColumnStats<T>*>(&other)); const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other); - if (!cs->has_values_) return; - if (!has_values_) { - has_values_ = true; - min_value_ = cs->min_value_; - max_value_ = cs->max_value_; - } else { - min_value_ = std::min(min_value_, cs->min_value_); - max_value_ = std::max(max_value_, cs->max_value_); - } + if (cs->has_min_max_values_) Update(cs->min_value_, cs->max_value_); + IncrementNullCount(cs->null_count_); } template <typename T> inline int64_t ColumnStats<T>::BytesNeeded() const { - return BytesNeeded(min_value_) + BytesNeeded(max_value_); + return BytesNeeded(min_value_) + BytesNeeded(max_value_) + + ParquetPlainEncoder::ByteSize(null_count_); } template <typename T> inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const { - DCHECK(has_values_); - std::string min_str; - EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str); - out->__set_min_value(move(min_str)); - std::string max_str; - EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str); - out->__set_max_value(move(max_str)); + if (has_min_max_values_) { + std::string min_str; + EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str); + out->__set_min_value(move(min_str)); + std::string max_str; + EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str); + out->__set_max_value(move(max_str)); + } + out->__set_null_count(null_count_); } template <typename T> @@ -145,44 +146,21 @@ inline bool ColumnStats<StringValue>::DecodePlainValue( } template <> -inline void ColumnStats<StringValue>::Update(const StringValue& v) { - if (!has_values_) { - has_values_ = true; - min_value_ = v; - min_buffer_.Clear(); - max_value_ = v; - max_buffer_.Clear(); - } else { - if (v < min_value_) { - min_value_ = v; - min_buffer_.Clear(); - } - if (v > max_value_) { - max_value_ = v; - max_buffer_.Clear(); - } - } -} - -template <> -inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) { - DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other)); - const ColumnStats<StringValue>* cs = - static_cast<const ColumnStats<StringValue>*>(&other); - if (!cs->has_values_) return; - if (!has_values_) { - has_values_ = true; - min_value_ = cs->min_value_; +inline void ColumnStats<StringValue>::Update( + const StringValue& min_value, const StringValue& max_value) { + if (!has_min_max_values_) { + has_min_max_values_ = true; + min_value_ = min_value; min_buffer_.Clear(); - max_value_ = cs->max_value_; + max_value_ = max_value; max_buffer_.Clear(); } else { - if (cs->min_value_ < min_value_) { - min_value_ = cs->min_value_; + if (min_value < min_value_) { + min_value_ = min_value; min_buffer_.Clear(); } - if (cs->max_value_ > max_value_) { - max_value_ = cs->max_value_; + if (max_value > max_value_) { + max_value_ = max_value; max_buffer_.Clear(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d5cd617/tests/query_test/test_insert_parquet.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index c19363f..908e50a 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -67,7 +67,7 @@ class TimeStamp(): return self.timetuple == other_timetuple -ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max']) +ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max', 'null_count']) # Test a smaller parquet file size as well # TODO: these tests take a while so we don't want to go through too many sizes but @@ -319,15 +319,17 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): if stats is None: decoded.append(None) continue + min_value = None + max_value = None - if stats.min_value is None and stats.max_value is None: - decoded.append(None) - continue + if stats.min_value is not None and stats.max_value is not None: + min_value = decode_stats_value(schema, stats.min_value) + max_value = decode_stats_value(schema, stats.max_value) + + null_count = stats.null_count + assert null_count is not None - assert stats.min_value is not None and stats.max_value is not None - min_value = decode_stats_value(schema, stats.min_value) - max_value = decode_stats_value(schema, stats.max_value) - decoded.append(ColumnStats(schema.name, min_value, max_value)) + decoded.append(ColumnStats(schema.name, min_value, max_value, null_count)) assert len(decoded) == len(schemas) return decoded @@ -367,7 +369,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): return row_group_stats - def _validate_min_max_stats(self, hdfs_path, expected_values, skip_col_idxs = None): + def _validate_parquet_stats(self, hdfs_path, expected_values, skip_col_idxs = None): """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup statistics in that file match the values in 'expected_values'. Columns indexed by 'skip_col_idx' are excluded from the verification of the expected values. @@ -408,7 +410,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): qualified_table_name, source_table) vector.get_value('exec_option')['num_nodes'] = 1 self.execute_query(query, vector.get_value('exec_option')) - self._validate_min_max_stats(hdfs_path, expected_values) + self._validate_parquet_stats(hdfs_path, expected_values) def test_write_statistics_alltypes(self, vector, unique_database): """Test that writing a parquet file populates the rowgroup statistics with the correct @@ -416,20 +418,20 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): """ # Expected values for functional.alltypes expected_min_max_values = [ - ColumnStats('id', 0, 7299), - ColumnStats('bool_col', False, True), - ColumnStats('tinyint_col', 0, 9), - ColumnStats('smallint_col', 0, 9), - ColumnStats('int_col', 0, 9), - ColumnStats('bigint_col', 0, 90), - ColumnStats('float_col', 0, RoundFloat(9.9, 1)), - ColumnStats('double_col', 0, RoundFloat(90.9, 1)), - ColumnStats('date_string_col', '01/01/09', '12/31/10'), - ColumnStats('string_col', '0', '9'), + ColumnStats('id', 0, 7299, 0), + ColumnStats('bool_col', False, True, 0), + ColumnStats('tinyint_col', 0, 9, 0), + ColumnStats('smallint_col', 0, 9, 0), + ColumnStats('int_col', 0, 9, 0), + ColumnStats('bigint_col', 0, 90, 0), + ColumnStats('float_col', 0, RoundFloat(9.9, 1), 0), + ColumnStats('double_col', 0, RoundFloat(90.9, 1), 0), + ColumnStats('date_string_col', '01/01/09', '12/31/10', 0), + ColumnStats('string_col', '0', '9', 0), ColumnStats('timestamp_col', TimeStamp('2009-01-01 00:00:00.0'), - TimeStamp('2010-12-31 05:09:13.860000')), - ColumnStats('year', 2009, 2010), - ColumnStats('month', 1, 12), + TimeStamp('2010-12-31 05:09:13.860000'), 0), + ColumnStats('year', 2009, 2010, 0), + ColumnStats('month', 1, 12, 0), ] self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes", @@ -441,12 +443,12 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): """ # Expected values for functional.decimal_tbl expected_min_max_values = [ - ColumnStats('d1', 1234, 132842), - ColumnStats('d2', 111, 2222), - ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789')), - ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789')), - ColumnStats('d5', Decimal('0.1'), Decimal('12345.789')), - ColumnStats('d6', 1, 1) + ColumnStats('d1', 1234, 132842, 0), + ColumnStats('d2', 111, 2222, 0), + ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789'), 0), + ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789'), 0), + ColumnStats('d5', Decimal('0.1'), Decimal('12345.789'), 0), + ColumnStats('d6', 1, 1, 0) ] self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl", @@ -458,31 +460,32 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): """ # Expected values for tpch_parquet.customer expected_min_max_values = [ - ColumnStats('c_custkey', 1, 150000), - ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000'), - ColumnStats('c_address', ' 2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ'), - ColumnStats('c_nationkey', 0, 24), - ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881'), - ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99')), - ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY'), + ColumnStats('c_custkey', 1, 150000, 0), + ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000', 0), + ColumnStats('c_address', ' 2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ', 0), + ColumnStats('c_nationkey', 0, 24, 0), + ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881', 0), + ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99'), 0), + ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY', 0), ColumnStats('c_comment', ' Tiresias according to the slyly blithe instructions ' 'detect quickly at the slyly express courts. express dinos wake ', - 'zzle. blithely regular instructions cajol'), + 'zzle. blithely regular instructions cajol', 0), ] self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer", expected_min_max_values) def test_write_statistics_null(self, vector, unique_database): - """Test that we don't write min/max statistics for null columns.""" + """Test that we don't write min/max statistics for null columns. Ensure null_count + is set for columns with null values.""" expected_min_max_values = [ - ColumnStats('a', 'a', 'a'), - ColumnStats('b', '', ''), - None, - None, - None, - ColumnStats('f', 'a\x00b', 'a\x00b'), - ColumnStats('g', '\x00', '\x00') + ColumnStats('a', 'a', 'a', 0), + ColumnStats('b', '', '', 0), + ColumnStats('c', None, None, 1), + ColumnStats('d', None, None, 1), + ColumnStats('e', None, None, 1), + ColumnStats('f', 'a\x00b', 'a\x00b', 0), + ColumnStats('g', '\x00', '\x00', 0) ] self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable", @@ -503,9 +506,9 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): (cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name) self.execute_query(insert_stmt) expected_min_max_values = [ - ColumnStats('c3', 'abc', 'xy'), - ColumnStats('vc', 'abc banana', 'ghj xyz'), - ColumnStats('st', 'abc xyz', 'lorem ipsum') + ColumnStats('c3', 'abc', 'xy', 0), + ColumnStats('vc', 'abc banana', 'ghj xyz', 0), + ColumnStats('st', 'abc xyz', 'lorem ipsum', 0) ] self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name, @@ -528,11 +531,11 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): self.execute_query(create_view_stmt) expected_min_max_values = [ - ColumnStats('id', -7299, 7298), - ColumnStats('int_col', -9, 8), - ColumnStats('bigint_col', -90, 80), - ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1)), - ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1)), + ColumnStats('id', -7299, 7298, 0), + ColumnStats('int_col', -9, 8, 0), + ColumnStats('bigint_col', -90, 80, 0), + ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1), 0), + ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1), 0), ] self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name, @@ -586,9 +589,26 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): self.execute_query(insert_stmt) expected_min_max_values = [ - ColumnStats('f', float('-inf'), float('inf')), - ColumnStats('d', float('-inf'), float('inf')), + ColumnStats('f', float('-inf'), float('inf'), 0), + ColumnStats('d', float('-inf'), float('inf'), 0), ] self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name, expected_min_max_values) + + def test_write_null_count_statistics(self, vector, unique_database): + """Test that writing a parquet file populates the rowgroup statistics with the correct + null_count. This test ensures that the null_count is correct for a table with multiple + null values.""" + + # Expected values for tpch_parquet.customer + expected_min_max_values = [ + ColumnStats('id', '8600000US00601', '8600000US999XX', 0), + ColumnStats('zip', '00601', '999XX', 0), + ColumnStats('description1', '\"00601 5-Digit ZCTA', '\"999XX 5-Digit ZCTA', 0), + ColumnStats('description2', ' 006 3-Digit ZCTA\"', ' 999 3-Digit ZCTA\"', 0), + ColumnStats('income', 0, 189570, 29), + ] + + self._ctas_table_and_verify_stats(vector, unique_database, + "functional_parquet.zipcode_incomes", expected_min_max_values)
