Repository: impala
Updated Branches:
refs/heads/master 971cf179f -> 6ce7ba295
IMPALA-6373: Allow primitive type widening on parquet tables
This patch implements support for primitive type widening on parquet
tables. It only supports conversion to those types without any loss of
precision.
- tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64),
double (DOUBLE)
- smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE)
- int (INT32) -> bigint (INT64), double (DOUBLE)
- float (FLOAT) -> double (DOUBLE)
Testing:
- Added BE test
- Added E2E test
- Ran core tests
Change-Id: If93394b035c64cf6fc5f37b54d29c034cc1f86e4
Reviewed-on: http://gerrit.cloudera.org:8080/11268
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9934b473
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9934b473
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9934b473
Branch: refs/heads/master
Commit: 9934b473b7239b1077dad1f0d308e168b803db6d
Parents: 971cf17
Author: Fredy Wijaya <[email protected]>
Authored: Fri Aug 17 16:24:03 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Aug 23 15:55:53 2018 +0000
----------------------------------------------------------------------
be/src/exec/parquet-column-readers.cc | 28 ++++++++++--
be/src/exec/parquet-column-readers.h | 7 +++
be/src/exec/parquet-common.h | 28 ++++++++++++
be/src/exec/parquet-metadata-utils.cc | 5 ++-
be/src/exec/parquet-plain-test.cc | 43 +++++++++++++++++++
testdata/data/README | 14 ++++++
testdata/data/primitive_type_widening.parquet | Bin 0 -> 2711 bytes
.../QueryTest/parquet-type-widening.test | 9 ++++
tests/query_test/test_scanners.py | 11 +++++
9 files changed, 139 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc
b/be/src/exec/parquet-column-readers.cc
index 2cb483e..6d8eddd 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -1573,16 +1573,36 @@ ParquetColumnReader* ParquetColumnReader::Create(const
SchemaNode& node,
slot_desc);
break;
case TYPE_BIGINT:
- reader = new ScalarColumnReader<int64_t, parquet::Type::INT64,
true>(parent, node,
- slot_desc);
+ switch (node.element->type) {
+ case parquet::Type::INT32:
+ reader = new ScalarColumnReader<int64_t, parquet::Type::INT32,
true>(parent,
+ node, slot_desc);
+ break;
+ default:
+ reader = new ScalarColumnReader<int64_t, parquet::Type::INT64,
true>(parent,
+ node, slot_desc);
+ break;
+ }
break;
case TYPE_FLOAT:
reader = new ScalarColumnReader<float, parquet::Type::FLOAT,
true>(parent, node,
slot_desc);
break;
case TYPE_DOUBLE:
- reader = new ScalarColumnReader<double, parquet::Type::DOUBLE,
true>(parent, node,
- slot_desc);
+ switch (node.element->type) {
+ case parquet::Type::INT32:
+ reader = new ScalarColumnReader<double , parquet::Type::INT32,
true>(parent,
+ node, slot_desc);
+ break;
+ case parquet::Type::FLOAT:
+ reader = new ScalarColumnReader<double, parquet::Type::FLOAT,
true>(parent,
+ node, slot_desc);
+ break;
+ default:
+ reader = new ScalarColumnReader<double, parquet::Type::DOUBLE,
true>(parent,
+ node, slot_desc);
+ break;
+ }
break;
case TYPE_TIMESTAMP:
reader = new ScalarColumnReader<TimestampValue, parquet::Type::INT96,
true>(
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h
b/be/src/exec/parquet-column-readers.h
index 022a868..790bde4 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -145,6 +145,13 @@ class ParquetColumnReader {
/// false if it reads one value per item). The reader is added to the
runtime state's
/// object pool. Does not create child readers for collection readers; these
must be
/// added by the caller.
+ ///
+ /// It supports the following primitive type widening that does not have any
loss of
+ /// precision.
+ /// - tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64),
double (DOUBLE)
+ /// - smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE)
+ /// - int (INT32) -> bigint (INT64), double (DOUBLE)
+ /// - float (FLOAT) -> double (DOUBLE)
static ParquetColumnReader* Create(const SchemaNode& node, bool
is_collection_field,
const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index f3add14..24aafae 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -249,6 +249,34 @@ inline int ParquetPlainEncoder::ByteSize(const
TimestampValue& v) {
return 12;
}
+template <typename From, typename To>
+inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t*
buffer_end, To* v) {
+ int byte_size = sizeof(From);
+ if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+ From dest;
+ memcpy(&dest, buffer, byte_size);
+ *v = dest;
+ return byte_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT32>(
+ const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
int64_t* v) {
+ return DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<double, parquet::Type::INT32>(
+ const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
double* v) {
+ return DecodeWithConversion<int32_t, double>(buffer, buffer_end, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<double, parquet::Type::FLOAT>(
+ const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
double* v) {
+ return DecodeWithConversion<float, double>(buffer, buffer_end, v);
+}
+
template <>
inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>(
const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
int8_t* v) {
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc
b/be/src/exec/parquet-metadata-utils.cc
index d199c6e..26dea5f 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -49,9 +49,10 @@ const map<PrimitiveType, set<parquet::Type::type>>
SUPPORTED_PHYSICAL_TYPES = {
{PrimitiveType::TYPE_TINYINT, {parquet::Type::INT32}},
{PrimitiveType::TYPE_SMALLINT, {parquet::Type::INT32}},
{PrimitiveType::TYPE_INT, {parquet::Type::INT32}},
- {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT64}},
+ {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT32, parquet::Type::INT64}},
{PrimitiveType::TYPE_FLOAT, {parquet::Type::FLOAT}},
- {PrimitiveType::TYPE_DOUBLE, {parquet::Type::DOUBLE}},
+ {PrimitiveType::TYPE_DOUBLE, {parquet::Type::INT32, parquet::Type::FLOAT,
+ parquet::Type::DOUBLE}},
{PrimitiveType::TYPE_TIMESTAMP, {parquet::Type::INT96}},
{PrimitiveType::TYPE_STRING, {parquet::Type::BYTE_ARRAY}},
{PrimitiveType::TYPE_DATE, {parquet::Type::BYTE_ARRAY}},
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/be/src/exec/parquet-plain-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-plain-test.cc
b/be/src/exec/parquet-plain-test.cc
index 2bcfa1d..9e42058 100644
--- a/be/src/exec/parquet-plain-test.cc
+++ b/be/src/exec/parquet-plain-test.cc
@@ -97,6 +97,27 @@ void TestTruncate(const InternalType& v, int
expected_byte_size) {
}
}
+template <typename InternalType, typename WidenInternalType,
+ parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
+ uint8_t buffer[expected_byte_size];
+ int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+ EXPECT_EQ(encoded_size, expected_byte_size);
+
+ // Check all possible truncations of the buffer.
+ for (int truncated_size = encoded_size - 1; truncated_size >= 0;
--truncated_size) {
+ WidenInternalType result;
+ /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
+ uint8_t* truncated_buffer = new uint8_t[truncated_size];
+ memcpy(truncated_buffer, buffer, truncated_size);
+ int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType,
PARQUET_TYPE>(
+ truncated_buffer, truncated_buffer + truncated_size,
expected_byte_size,
+ &result);
+ EXPECT_EQ(-1, decoded_size);
+ delete[] truncated_buffer;
+ }
+}
+
template <typename InternalType, parquet::Type::type PARQUET_TYPE>
void TestType(const InternalType& v, int expected_byte_size) {
uint8_t buffer[expected_byte_size];
@@ -112,6 +133,23 @@ void TestType(const InternalType& v, int
expected_byte_size) {
TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size);
}
+template <typename InternalType, typename WidenInternalType,
+ parquet::Type::type PARQUET_TYPE>
+void TestTypeWidening(const InternalType& v, int expected_byte_size) {
+ uint8_t buffer[expected_byte_size];
+ int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+ EXPECT_EQ(encoded_size, expected_byte_size);
+
+ WidenInternalType result;
+ int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType,
PARQUET_TYPE>(
+ buffer, buffer + expected_byte_size, expected_byte_size, &result);
+ EXPECT_EQ(decoded_size, expected_byte_size);
+ EXPECT_EQ(v, result);
+
+ TestTruncate<InternalType, WidenInternalType, PARQUET_TYPE>(
+ v, expected_byte_size);
+}
+
TEST(PlainEncoding, Basic) {
int8_t i8 = 12;
int16_t i16 = 123;
@@ -131,6 +169,11 @@ TEST(PlainEncoding, Basic) {
TestType<StringValue, parquet::Type::BYTE_ARRAY>(sv, sizeof(int32_t) +
sv.len);
TestType<TimestampValue, parquet::Type::INT96>(tv, 12);
+ // Test type widening.
+ TestTypeWidening<int32_t, int64_t, parquet::Type::INT32>(i32,
sizeof(int32_t));
+ TestTypeWidening<int32_t, double, parquet::Type::INT32>(i32,
sizeof(int32_t));
+ TestTypeWidening<float, double, parquet::Type::FLOAT>(f, sizeof(float));
+
int test_val = 1234;
int var_len_decimal_size = sizeof(int32_t)
+ 2 /*min bytes required for storing test_val*/;
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index ee29090..1d4c14b 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -187,3 +187,17 @@ Impala needs to be able to read such values (IMPALA-5542)
decimal_stored_as_int64.parquet:
Parquet file generated by Spark 2.3.1 that contains decimals stored as int64.
Impala needs to be able to read such values (IMPALA-5542)
+
+primitive_type_widening.parquet:
+Parquet file that contains two rows with the following schema:
+- int32 tinyint_col1
+- int32 tinyint_col2
+- int32 tinyint_col3
+- int32 tinyint_col4
+- int32 smallint_col1
+- int32 smallint_col2
+- int32 smallint_col3
+- int32 int_col1
+- int32 int_col2
+- float float_col
+It is used to test primitive type widening (IMPALA-6373).
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/testdata/data/primitive_type_widening.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/primitive_type_widening.parquet
b/testdata/data/primitive_type_widening.parquet
new file mode 100644
index 0000000..57027de
Binary files /dev/null and b/testdata/data/primitive_type_widening.parquet
differ
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
----------------------------------------------------------------------
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
b/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
new file mode 100644
index 0000000..f0f11c7
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/parquet-type-widening.test
@@ -0,0 +1,9 @@
+====
+---- QUERY
+select * from primitive_type_widening;
+---- RESULTS
+1,2,3,4,5,6,7,8,9,123.4560012817383
+10,20,30,40,50,60,70,80,90,1230.4560546875
+---- TYPES
+SMALLINT,INT,BIGINT,DOUBLE,INT,BIGINT,DOUBLE,INT,DOUBLE,DOUBLE
+====
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/impala/blob/9934b473/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py
b/tests/query_test/test_scanners.py
index 1cd883e..c9ad888 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -663,6 +663,17 @@ class TestParquet(ImpalaTestSuite):
"select * from {0}.{1}".format(unique_database, TABLE_NAME))
assert(len(result.data) == 33)
+ def test_type_widening(self, vector, unique_database):
+ """IMPALA-6373: Test that Impala can read parquet file with column types
smaller than
+ the schema with larger types"""
+ TABLE_NAME = "primitive_type_widening"
+ create_table_and_copy_files(self.client, """CREATE TABLE {db}.{tbl} (
+ a smallint, b int, c bigint, d double, e int, f bigint, g double, h
int,
+ i double, j double) STORED AS PARQUET""", unique_database, TABLE_NAME,
+ ["/testdata/data/{0}.parquet".format(TABLE_NAME)])
+
+ self.run_test_case("QueryTest/parquet-type-widening", vector,
unique_database)
+
# We use various scan range lengths to exercise corner cases in the HDFS
scanner more
# thoroughly. In particular, it will exercise:
# 1. default scan range