Repository: impala
Updated Branches:
  refs/heads/master 971cf179f -> 6ce7ba295


IMPALA-6373: Allow primitive type widening on parquet tables

This patch implements support for primitive type widening on parquet
tables. It only supports conversion to those types without any loss of
precision.
- tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64),
                     double (DOUBLE)
- smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE)
- int (INT32) -> bigint (INT64), double (DOUBLE)
- float (FLOAT) -> double (DOUBLE)

Testing:
- Added BE test
- Added E2E test
- Ran core tests

Change-Id: If93394b035c64cf6fc5f37b54d29c034cc1f86e4
Reviewed-on: http://gerrit.cloudera.org:8080/11268
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9934b473
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9934b473
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9934b473

Branch: refs/heads/master
Commit: 9934b473b7239b1077dad1f0d308e168b803db6d
Parents: 971cf17
Author: Fredy Wijaya <[email protected]>
Authored: Fri Aug 17 16:24:03 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Aug 23 15:55:53 2018 +0000

----------------------------------------------------------------------
 be/src/exec/parquet-column-readers.cc           |  28 ++++++++++--
 be/src/exec/parquet-column-readers.h            |   7 +++
 be/src/exec/parquet-common.h                    |  28 ++++++++++++
 be/src/exec/parquet-metadata-utils.cc           |   5 ++-
 be/src/exec/parquet-plain-test.cc               |  43 +++++++++++++++++++
 testdata/data/README                            |  14 ++++++
 testdata/data/primitive_type_widening.parquet   | Bin 0 -> 2711 bytes
 .../QueryTest/parquet-type-widening.test        |   9 ++++
 tests/query_test/test_scanners.py               |  11 +++++
 9 files changed, 139 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc 
b/be/src/exec/parquet-column-readers.cc
index 2cb483e..6d8eddd 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -1573,16 +1573,36 @@ ParquetColumnReader* ParquetColumnReader::Create(const 
SchemaNode& node,
             slot_desc);
         break;
       case TYPE_BIGINT:
-        reader = new ScalarColumnReader<int64_t, parquet::Type::INT64, 
true>(parent, node,
-            slot_desc);
+        switch (node.element->type) {
+          case parquet::Type::INT32:
+            reader = new ScalarColumnReader<int64_t, parquet::Type::INT32, 
true>(parent,
+                node, slot_desc);
+            break;
+          default:
+            reader = new ScalarColumnReader<int64_t, parquet::Type::INT64, 
true>(parent,
+                node, slot_desc);
+            break;
+        }
         break;
       case TYPE_FLOAT:
         reader = new ScalarColumnReader<float, parquet::Type::FLOAT, 
true>(parent, node,
             slot_desc);
         break;
       case TYPE_DOUBLE:
-        reader = new ScalarColumnReader<double, parquet::Type::DOUBLE, 
true>(parent, node,
-            slot_desc);
+        switch (node.element->type) {
+          case parquet::Type::INT32:
+            reader = new ScalarColumnReader<double , parquet::Type::INT32, 
true>(parent,
+                node, slot_desc);
+            break;
+          case parquet::Type::FLOAT:
+            reader = new ScalarColumnReader<double, parquet::Type::FLOAT, 
true>(parent,
+                node, slot_desc);
+            break;
+          default:
+            reader = new ScalarColumnReader<double, parquet::Type::DOUBLE, 
true>(parent,
+                node, slot_desc);
+            break;
+        }
         break;
       case TYPE_TIMESTAMP:
         reader = new ScalarColumnReader<TimestampValue, parquet::Type::INT96, 
true>(

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
index 022a868..790bde4 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -145,6 +145,13 @@ class ParquetColumnReader {
   /// false if it reads one value per item).  The reader is added to the 
runtime state's
   /// object pool. Does not create child readers for collection readers; these 
must be
   /// added by the caller.
+  ///
+  /// It supports the following primitive type widening that does not have any 
loss of
+  /// precision.
+  /// - tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64), 
double (DOUBLE)
+  /// - smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE)
+  /// - int (INT32) -> bigint (INT64), double (DOUBLE)
+  /// - float (FLOAT) -> double (DOUBLE)
   static ParquetColumnReader* Create(const SchemaNode& node, bool 
is_collection_field,
       const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index f3add14..24aafae 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -249,6 +249,34 @@ inline int ParquetPlainEncoder::ByteSize(const 
TimestampValue& v) {
   return 12;
 }
 
+template <typename From, typename To>
+inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* 
buffer_end, To* v) {
+  int byte_size = sizeof(From);
+  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+  From dest;
+  memcpy(&dest, buffer, byte_size);
+  *v = dest;
+  return byte_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int64_t* v) {
+  return DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<double, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
double* v) {
+  return DecodeWithConversion<int32_t, double>(buffer, buffer_end, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<double, parquet::Type::FLOAT>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
double* v) {
+  return DecodeWithConversion<float, double>(buffer, buffer_end, v);
+}
+
 template <>
 inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>(
     const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int8_t* v) {

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc 
b/be/src/exec/parquet-metadata-utils.cc
index d199c6e..26dea5f 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -49,9 +49,10 @@ const map<PrimitiveType, set<parquet::Type::type>> 
SUPPORTED_PHYSICAL_TYPES = {
     {PrimitiveType::TYPE_TINYINT, {parquet::Type::INT32}},
     {PrimitiveType::TYPE_SMALLINT, {parquet::Type::INT32}},
     {PrimitiveType::TYPE_INT, {parquet::Type::INT32}},
-    {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT64}},
+    {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT32, parquet::Type::INT64}},
     {PrimitiveType::TYPE_FLOAT, {parquet::Type::FLOAT}},
-    {PrimitiveType::TYPE_DOUBLE, {parquet::Type::DOUBLE}},
+    {PrimitiveType::TYPE_DOUBLE, {parquet::Type::INT32, parquet::Type::FLOAT,
+        parquet::Type::DOUBLE}},
     {PrimitiveType::TYPE_TIMESTAMP, {parquet::Type::INT96}},
     {PrimitiveType::TYPE_STRING, {parquet::Type::BYTE_ARRAY}},
     {PrimitiveType::TYPE_DATE, {parquet::Type::BYTE_ARRAY}},

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-plain-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-plain-test.cc 
b/be/src/exec/parquet-plain-test.cc
index 2bcfa1d..9e42058 100644
--- a/be/src/exec/parquet-plain-test.cc
+++ b/be/src/exec/parquet-plain-test.cc
@@ -97,6 +97,27 @@ void TestTruncate(const InternalType& v, int 
expected_byte_size) {
   }
 }
 
+template <typename InternalType, typename WidenInternalType,
+    parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
+  uint8_t buffer[expected_byte_size];
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+  EXPECT_EQ(encoded_size, expected_byte_size);
+
+  // Check all possible truncations of the buffer.
+  for (int truncated_size = encoded_size - 1; truncated_size >= 0; 
--truncated_size) {
+    WidenInternalType result;
+    /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
+    uint8_t* truncated_buffer = new uint8_t[truncated_size];
+    memcpy(truncated_buffer, buffer, truncated_size);
+    int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, 
PARQUET_TYPE>(
+        truncated_buffer, truncated_buffer + truncated_size, 
expected_byte_size,
+        &result);
+    EXPECT_EQ(-1, decoded_size);
+    delete[] truncated_buffer;
+  }
+}
+
 template <typename InternalType, parquet::Type::type PARQUET_TYPE>
 void TestType(const InternalType& v, int expected_byte_size) {
   uint8_t buffer[expected_byte_size];
@@ -112,6 +133,23 @@ void TestType(const InternalType& v, int 
expected_byte_size) {
   TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size);
 }
 
+template <typename InternalType, typename WidenInternalType,
+    parquet::Type::type PARQUET_TYPE>
+void TestTypeWidening(const InternalType& v, int expected_byte_size) {
+  uint8_t buffer[expected_byte_size];
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+  EXPECT_EQ(encoded_size, expected_byte_size);
+
+  WidenInternalType result;
+  int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, 
PARQUET_TYPE>(
+      buffer, buffer + expected_byte_size, expected_byte_size, &result);
+  EXPECT_EQ(decoded_size, expected_byte_size);
+  EXPECT_EQ(v, result);
+
+  TestTruncate<InternalType, WidenInternalType, PARQUET_TYPE>(
+      v, expected_byte_size);
+}
+
 TEST(PlainEncoding, Basic) {
   int8_t i8 = 12;
   int16_t i16 = 123;
@@ -131,6 +169,11 @@ TEST(PlainEncoding, Basic) {
   TestType<StringValue, parquet::Type::BYTE_ARRAY>(sv, sizeof(int32_t) + 
sv.len);
   TestType<TimestampValue, parquet::Type::INT96>(tv, 12);
 
+  // Test type widening.
+  TestTypeWidening<int32_t, int64_t, parquet::Type::INT32>(i32, 
sizeof(int32_t));
+  TestTypeWidening<int32_t, double, parquet::Type::INT32>(i32, 
sizeof(int32_t));
+  TestTypeWidening<float, double, parquet::Type::FLOAT>(f, sizeof(float));
+
   int test_val = 1234;
   int var_len_decimal_size = sizeof(int32_t)
       + 2 /*min bytes required for storing test_val*/;

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index ee29090..1d4c14b 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -187,3 +187,17 @@ Impala needs to be able to read such values (IMPALA-5542)
 decimal_stored_as_int64.parquet:
 Parquet file generated by Spark 2.3.1 that contains decimals stored as int64.
 Impala needs to be able to read such values (IMPALA-5542)
+
+primitive_type_widening.parquet:
+Parquet file that contains two rows with the following schema:
+- int32 tinyint_col1
+- int32 tinyint_col2
+- int32 tinyint_col3
+- int32 tinyint_col4
+- int32 smallint_col1
+- int32 smallint_col2
+- int32 smallint_col3
+- int32 int_col1
+- int32 int_col2
+- float float_col
+It is used to test primitive type widening (IMPALA-6373).

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/testdata/data/primitive_type_widening.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/primitive_type_widening.parquet 
b/testdata/data/primitive_type_widening.parquet
new file mode 100644
index 0000000..57027de
Binary files /dev/null and b/testdata/data/primitive_type_widening.parquet 
differ

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
new file mode 100644
index 0000000..f0f11c7
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
@@ -0,0 +1,9 @@
+====
+---- QUERY
+select * from primitive_type_widening;
+---- RESULTS
+1,2,3,4,5,6,7,8,9,123.4560012817383
+10,20,30,40,50,60,70,80,90,1230.4560546875
+---- TYPES
+SMALLINT,INT,BIGINT,DOUBLE,INT,BIGINT,DOUBLE,INT,DOUBLE,DOUBLE
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 1cd883e..c9ad888 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -663,6 +663,17 @@ class TestParquet(ImpalaTestSuite):
         "select * from {0}.{1}".format(unique_database, TABLE_NAME))
     assert(len(result.data) == 33)
 
+  def test_type_widening(self, vector, unique_database):
+    """IMPALA-6373: Test that Impala can read parquet file with column types 
smaller than
+       the schema with larger types"""
+    TABLE_NAME = "primitive_type_widening"
+    create_table_and_copy_files(self.client, """CREATE TABLE {db}.{tbl} (
+        a smallint, b int, c bigint, d double, e int, f bigint, g double, h 
int,
+        i double, j double) STORED AS PARQUET""", unique_database, TABLE_NAME,
+        ["/testdata/data/{0}.parquet".format(TABLE_NAME)])
+
+    self.run_test_case("QueryTest/parquet-type-widening", vector, 
unique_database)
+
 # We use various scan range lengths to exercise corner cases in the HDFS 
scanner more
 # thoroughly. In particular, it will exercise:
 # 1. default scan range

Reply via email to