This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7f6b074b84 ARROW-16116: [C++] Handle non-nullable fields when reading
Parquet
7f6b074b84 is described below
commit 7f6b074b84b1ca519b7c5fc7da318e8d47d44278
Author: David Li <[email protected]>
AuthorDate: Wed May 4 15:19:16 2022 +0200
ARROW-16116: [C++] Handle non-nullable fields when reading Parquet
Closes #12829 from lidavidm/arrow-16116
Authored-by: David Li <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/testing/random.cc | 9 +-
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 204 +++++++++++++++++++++-
cpp/src/parquet/arrow/reader.cc | 4 +-
cpp/src/parquet/arrow/reader_internal.cc | 176 +++++++++++++------
cpp/src/parquet/arrow/reader_internal.h | 2 +-
5 files changed, 331 insertions(+), 64 deletions(-)
diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc
index 34733ac16e..f42909a8e2 100644
--- a/cpp/src/arrow/testing/random.cc
+++ b/cpp/src/arrow/testing/random.cc
@@ -795,8 +795,13 @@ std::shared_ptr<Array> RandomArrayGenerator::ArrayOf(const
Field& field, int64_t
case Type::type::DATE64: {
using c_type = typename Date64Type::c_type;
constexpr c_type kFullDayMillis = 1000 * 60 * 60 * 24;
- constexpr c_type min_value = std::numeric_limits<c_type>::min() /
kFullDayMillis;
- constexpr c_type max_value = std::numeric_limits<c_type>::max() /
kFullDayMillis;
+ constexpr c_type kDefaultMin = std::numeric_limits<c_type>::min() /
kFullDayMillis;
+ constexpr c_type kDefaultMax = std::numeric_limits<c_type>::max() /
kFullDayMillis;
+
+ const c_type min_value =
+ GetMetadata<c_type>(field.metadata().get(), "min", kDefaultMin);
+ const c_type max_value =
+ GetMetadata<c_type>(field.metadata().get(), "max", kDefaultMax);
return *Numeric<Date64Type>(length, min_value, max_value,
null_probability)
->View(field.type());
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 5abc516937..db8b685fa5 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -49,6 +49,7 @@
#include "arrow/util/config.h" // for ARROW_CSV definition
#include "arrow/util/decimal.h"
#include "arrow/util/future.h"
+#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
@@ -3239,6 +3240,197 @@ TEST(TestArrowWrite, CheckChunkSize) {
WriteTable(*table, ::arrow::default_memory_pool(), sink,
chunk_size));
}
+void DoNestedValidate(const std::shared_ptr<::arrow::DataType>& inner_type,
+ const std::shared_ptr<::arrow::Field>& outer_field,
+ const std::shared_ptr<Buffer>& buffer,
+ const std::shared_ptr<::arrow::Table>& table) {
+ std::unique_ptr<FileReader> reader;
+ FileReaderBuilder reader_builder;
+ ASSERT_OK(reader_builder.Open(std::make_shared<BufferReader>(buffer)));
+ ASSERT_OK(reader_builder.Build(&reader));
+ ARROW_SCOPED_TRACE("Parquet schema: ",
+
reader->parquet_reader()->metadata()->schema()->ToString());
+ std::shared_ptr<Table> result;
+ ASSERT_OK_NO_THROW(reader->ReadTable(&result));
+
+ if (inner_type->id() == ::arrow::Type::DATE64 ||
+ inner_type->id() == ::arrow::Type::TIMESTAMP ||
+ inner_type->Equals(*::arrow::time32(::arrow::TimeUnit::SECOND))) {
+ // Encoding is different when written out, cast back
+ ASSERT_OK_AND_ASSIGN(auto casted_array,
+ ::arrow::compute::Cast(result->column(0),
outer_field->type()));
+ result = ::arrow::Table::Make(::arrow::schema({outer_field}),
+ {casted_array.chunked_array()});
+ }
+
+ ASSERT_OK(result->ValidateFull());
+ ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false));
+ // Ensure inner array has no nulls
+ for (const auto& chunk : result->column(0)->chunks()) {
+ const auto& arr = checked_cast<const ::arrow::StructArray&>(*chunk);
+ const auto inner_arr = arr.field(0);
+ ASSERT_EQ(inner_arr->null_count(), 0) << inner_arr->ToString();
+ }
+}
+
+void DoNestedRequiredRoundtrip(
+ const std::shared_ptr<::arrow::DataType>& inner_type,
+ const std::shared_ptr<WriterProperties>& writer_properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties) {
+ // Test ARROW-15961/ARROW-16116
+ ARROW_SCOPED_TRACE("Type: ", inner_type->ToString());
+ std::shared_ptr<::arrow::KeyValueMetadata> metadata;
+ if (inner_type->id() != ::arrow::Type::DICTIONARY) {
+ metadata = ::arrow::key_value_metadata({{"min", "0"}, {"max", "127"}});
+ }
+ auto inner_field =
+ ::arrow::field("inner", inner_type, /*nullable=*/false,
std::move(metadata));
+ auto type = ::arrow::struct_({inner_field});
+ auto field = ::arrow::field("outer", type, /*nullable=*/true);
+
+ auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42);
+ auto inner = gen.ArrayOf(*inner_field, /*size=*/4);
+ ASSERT_EQ(inner->null_count(), 0) << inner->ToString();
+
+ ::arrow::TypedBufferBuilder<bool> bitmap_builder;
+ ASSERT_OK(bitmap_builder.Append(2, false));
+ ASSERT_OK(bitmap_builder.Append(2, true));
+ ASSERT_OK_AND_ASSIGN(auto null_bitmap, bitmap_builder.Finish());
+ ASSERT_OK_AND_ASSIGN(auto array,
+ ::arrow::StructArray::Make({inner}, {inner_field},
null_bitmap));
+ auto table = ::arrow::Table::Make(::arrow::schema({field}), {array});
+ ASSERT_OK(table->ValidateFull());
+
+ auto sink = CreateOutputStream();
+ ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+ /*row_group_size=*/4, writer_properties,
+ arrow_writer_properties));
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ ASSERT_NO_FATAL_FAILURE(DoNestedValidate(inner_type, field, buffer, table));
+}
+
+TEST(ArrowReadWrite, NestedRequiredOuterOptional) {
+ std::vector<std::shared_ptr<DataType>> types = ::arrow::PrimitiveTypes();
+ types.insert(types.end(), ::arrow::TemporalTypes().begin(),
+ ::arrow::TemporalTypes().end());
+ types.push_back(::arrow::duration(::arrow::TimeUnit::SECOND));
+ types.push_back(::arrow::duration(::arrow::TimeUnit::MILLI));
+ types.push_back(::arrow::duration(::arrow::TimeUnit::MICRO));
+ types.push_back(::arrow::duration(::arrow::TimeUnit::NANO));
+ types.push_back(::arrow::decimal128(3, 2));
+ types.push_back(::arrow::decimal256(3, 2));
+ types.push_back(::arrow::fixed_size_binary(4));
+ // Note large variants of types appear to get converted back to regular on
read
+ types.push_back(::arrow::dictionary(::arrow::int32(), ::arrow::binary()));
+ types.push_back(::arrow::dictionary(::arrow::int32(), ::arrow::utf8()));
+
+ for (const auto& inner_type : types) {
+ if (inner_type->id() == ::arrow::Type::NA) continue;
+
+ auto writer_props = WriterProperties::Builder();
+ auto arrow_writer_props = ArrowWriterProperties::Builder();
+ arrow_writer_props.store_schema();
+ if (inner_type->id() == ::arrow::Type::UINT32) {
+ writer_props.version(ParquetVersion::PARQUET_2_4);
+ } else if (inner_type->id() == ::arrow::Type::TIMESTAMP) {
+ // By default ns is coerced to us, override that
+ ::arrow::TimeUnit::type unit =
+ checked_cast<const ::arrow::TimestampType&>(*inner_type).unit();
+ if (unit == ::arrow::TimeUnit::NANO) {
+ writer_props.version(ParquetVersion::PARQUET_2_6);
+ arrow_writer_props.coerce_timestamps(unit);
+ }
+ }
+
+ ASSERT_NO_FATAL_FAILURE(DoNestedRequiredRoundtrip(inner_type,
writer_props.build(),
+
arrow_writer_props.build()));
+
+ if (inner_type->id() == ::arrow::Type::TIMESTAMP) {
+ ARROW_SCOPED_TRACE("enable_deprecated_int96_timestamps = true");
+ arrow_writer_props.enable_deprecated_int96_timestamps();
+ ASSERT_NO_FATAL_FAILURE(DoNestedRequiredRoundtrip(inner_type,
writer_props.build(),
+
arrow_writer_props.build()));
+ }
+ }
+ // NOTE: read_dictionary option only applies to top-level columns,
+ // so we don't address that path here
+}
+
+TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) {
+ // Manually construct files to test decimals encoded as variable-length byte
array
+ ::arrow::TypedBufferBuilder<bool> bitmap_builder;
+ ASSERT_OK(bitmap_builder.Append(2, false));
+ ASSERT_OK(bitmap_builder.Append(2, true));
+ ASSERT_OK_AND_ASSIGN(auto null_bitmap, bitmap_builder.Finish());
+
+ const std::vector<int16_t> def_levels = {0, 0, 1, 1};
+ const std::vector<ByteArray> byte_arrays = {
+ ByteArray("\x01\xe2\x40"), // 123456
+ ByteArray("\x0f\x12\x06"), // 987654
+ };
+ const std::vector<int32_t> int32_values = {123456, 987654};
+ const std::vector<int64_t> int64_values = {123456, 987654};
+
+ const auto inner_type = ::arrow::decimal128(6, 3);
+ auto inner_field = ::arrow::field("inner", inner_type, /*nullable=*/false);
+ auto type = ::arrow::struct_({inner_field});
+ auto field = ::arrow::field("outer", type, /*nullable=*/true);
+ auto inner =
+ ArrayFromJSON(inner_type, R"(["000.000", "000.000", "123.456",
"987.654"])");
+ ASSERT_OK_AND_ASSIGN(auto array,
+ ::arrow::StructArray::Make({inner}, {inner_field},
null_bitmap));
+ auto table = ::arrow::Table::Make(::arrow::schema({field}), {array});
+
+ for (const auto& encoding : {Type::BYTE_ARRAY, Type::INT32, Type::INT64}) {
+ // Manually write out file based on encoding type
+ ARROW_SCOPED_TRACE("Encoding decimals as ", encoding);
+ auto parquet_schema = GroupNode::Make(
+ "schema", Repetition::REQUIRED,
+ {GroupNode::Make("outer", Repetition::OPTIONAL,
+ {
+ PrimitiveNode::Make("inner", Repetition::REQUIRED,
+ LogicalType::Decimal(6, 3),
encoding),
+ })});
+
+ auto sink = CreateOutputStream();
+ auto file_writer =
+ ParquetFileWriter::Open(sink,
checked_pointer_cast<GroupNode>(parquet_schema));
+ auto column_writer = file_writer->AppendRowGroup()->NextColumn();
+ ARROW_SCOPED_TRACE("Column descriptor: ",
column_writer->descr()->ToString());
+
+ switch (encoding) {
+ case Type::BYTE_ARRAY: {
+ auto typed_writer =
+ checked_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
+ typed_writer->WriteBatch(4, def_levels.data(), /*rep_levels=*/nullptr,
+ byte_arrays.data());
+ break;
+ }
+ case Type::INT32: {
+ auto typed_writer = checked_cast<Int32Writer*>(column_writer);
+ typed_writer->WriteBatch(4, def_levels.data(), /*rep_levels=*/nullptr,
+ int32_values.data());
+ break;
+ }
+ case Type::INT64: {
+ auto typed_writer = checked_cast<Int64Writer*>(column_writer);
+ typed_writer->WriteBatch(4, def_levels.data(), /*rep_levels=*/nullptr,
+ int64_values.data());
+ break;
+ }
+ default:
+ FAIL() << "Invalid encoding";
+ return;
+ }
+
+ column_writer->Close();
+ file_writer->Close();
+
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ ASSERT_NO_FATAL_FAILURE(DoNestedValidate(inner_type, field, buffer,
table));
+ }
+}
+
class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type>
{
protected:
// make it *3 to make it easily divisible by 3
@@ -3516,7 +3708,17 @@ TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
// validate struct array
ASSERT_NO_FATAL_FAILURE(ValidateArray(*struct_field_array,
NUM_SIMPLE_TEST_ROWS / 3));
// validate leaf1
- ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf1_array,
NUM_SIMPLE_TEST_ROWS / 3));
+ ASSERT_NO_FATAL_FAILURE(ValidateArray(*leaf1_array, /*expected_nulls=*/0));
+ // Validate values manually here. The child array is non-nullable,
+ // but Parquet does not store null values, so we need to account for
+ // the struct's validity bitmap.
+ {
+ int j = 0;
+ for (int i = 0; i < values_array_->length(); i++) {
+ if (struct_field_array->IsNull(i)) continue;
+ ASSERT_EQ(leaf1_array->Value(i), values_array_->Value(j++));
+ }
+ }
// validate leaf2
ASSERT_NO_FATAL_FAILURE(
ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2 / 3));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 4c298a9ce4..03470d4e8f 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -482,8 +482,8 @@ class LeafReader : public ColumnReaderImpl {
NextRowGroup();
}
}
- RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(),
descr_,
- ctx_->pool, &out_));
+ RETURN_NOT_OK(
+ TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool,
&out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
diff --git a/cpp/src/parquet/arrow/reader_internal.cc
b/cpp/src/parquet/arrow/reader_internal.cc
index 0981a73c44..d8aad0d0f2 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -303,9 +303,23 @@ Status StatisticsAsScalars(const Statistics& statistics,
namespace {
+/// Drop the validity buffer from each chunk.
+///
+/// Used when reading a non-nullable field.
+void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) {
+ for (size_t i = 0; i < chunks->size(); i++) {
+ if ((*chunks)[i]->data()->buffers[0]) {
+ std::shared_ptr<::arrow::ArrayData> data = (*chunks)[i]->data();
+ data->null_count = 0;
+ data->buffers[0] = nullptr;
+ (*chunks)[i] = MakeArray(data);
+ }
+ }
+}
+
template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader, MemoryPool* pool,
- const std::shared_ptr<DataType>& type, Datum* out) {
+ const std::shared_ptr<Field>& field, Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
@@ -315,21 +329,35 @@ Status TransferInt(RecordReader* reader, MemoryPool* pool,
auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
- *out = std::make_shared<ArrayType<ArrowType>>(
- type, length, std::move(data), reader->ReleaseIsValid(),
reader->null_count());
+ if (field->nullable()) {
+ *out = std::make_shared<ArrayType<ArrowType>>(field->type(), length,
std::move(data),
+ reader->ReleaseIsValid(),
+ reader->null_count());
+ } else {
+ *out =
+ std::make_shared<ArrayType<ArrowType>>(field->type(), length,
std::move(data),
+ /*null_bitmap=*/nullptr,
/*null_count=*/0);
+ }
return Status::OK();
}
std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
- const std::shared_ptr<DataType>& type)
{
- std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
- reader->ReleaseValues()};
- auto data = std::make_shared<::arrow::ArrayData>(type,
reader->values_written(),
- buffers,
reader->null_count());
+ const std::shared_ptr<Field>& field) {
+ std::shared_ptr<::arrow::ArrayData> data;
+ if (field->nullable()) {
+ std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
+ reader->ReleaseValues()};
+ data = std::make_shared<::arrow::ArrayData>(field->type(),
reader->values_written(),
+ std::move(buffers),
reader->null_count());
+ } else {
+ std::vector<std::shared_ptr<Buffer>> buffers = {nullptr,
reader->ReleaseValues()};
+ data = std::make_shared<::arrow::ArrayData>(field->type(),
reader->values_written(),
+ std::move(buffers),
/*null_count=*/0);
+ }
return ::arrow::MakeArray(data);
}
-Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
+Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool,
Datum* out) {
int64_t length = reader->values_written();
const int64_t buffer_size = bit_util::BytesForBits(length);
@@ -346,13 +374,18 @@ Status TransferBool(RecordReader* reader, MemoryPool*
pool, Datum* out) {
}
}
- *out = std::make_shared<BooleanArray>(length, std::move(data),
reader->ReleaseIsValid(),
- reader->null_count());
+ if (nullable) {
+ *out = std::make_shared<BooleanArray>(length, std::move(data),
+ reader->ReleaseIsValid(),
reader->null_count());
+ } else {
+ *out = std::make_shared<BooleanArray>(length, std::move(data),
+ /*null_bitmap=*/nullptr,
/*null_count=*/0);
+ }
return Status::OK();
}
Status TransferInt96(RecordReader* reader, MemoryPool* pool,
- const std::shared_ptr<DataType>& type, Datum* out,
+ const std::shared_ptr<Field>& field, Datum* out,
const ::arrow::TimeUnit::type int96_arrow_time_unit) {
int64_t length = reader->values_written();
auto values = reinterpret_cast<const Int96*>(reader->values());
@@ -381,13 +414,19 @@ Status TransferInt96(RecordReader* reader, MemoryPool*
pool,
}
}
}
- *out = std::make_shared<TimestampArray>(type, length, std::move(data),
- reader->ReleaseIsValid(),
reader->null_count());
+ if (field->nullable()) {
+ *out =
+ std::make_shared<TimestampArray>(field->type(), length,
std::move(data),
+ reader->ReleaseIsValid(),
reader->null_count());
+ } else {
+ *out = std::make_shared<TimestampArray>(field->type(), length,
std::move(data),
+ /*null_bitmap=*/nullptr,
/*null_count=*/0);
+ }
return Status::OK();
}
Status TransferDate64(RecordReader* reader, MemoryPool* pool,
- const std::shared_ptr<DataType>& type, Datum* out) {
+ const std::shared_ptr<Field>& field, Datum* out) {
int64_t length = reader->values_written();
auto values = reinterpret_cast<const int32_t*>(reader->values());
@@ -399,8 +438,15 @@ Status TransferDate64(RecordReader* reader, MemoryPool*
pool,
*out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
}
- *out = std::make_shared<::arrow::Date64Array>(
- type, length, std::move(data), reader->ReleaseIsValid(),
reader->null_count());
+ if (field->nullable()) {
+ *out = std::make_shared<::arrow::Date64Array>(field->type(), length,
std::move(data),
+ reader->ReleaseIsValid(),
+ reader->null_count());
+ } else {
+ *out =
+ std::make_shared<::arrow::Date64Array>(field->type(), length,
std::move(data),
+ /*null_bitmap=*/nullptr,
/*null_count=*/0);
+ }
return Status::OK();
}
@@ -409,22 +455,28 @@ Status TransferDate64(RecordReader* reader, MemoryPool*
pool,
Status TransferDictionary(RecordReader* reader,
const std::shared_ptr<DataType>& logical_value_type,
- std::shared_ptr<ChunkedArray>* out) {
+ bool nullable, std::shared_ptr<ChunkedArray>* out) {
auto dict_reader = dynamic_cast<DictionaryRecordReader*>(reader);
DCHECK(dict_reader);
*out = dict_reader->GetResult();
if (!logical_value_type->Equals(*(*out)->type())) {
ARROW_ASSIGN_OR_RAISE(*out, (*out)->View(logical_value_type));
}
+ if (!nullable) {
+ ::arrow::ArrayVector chunks = (*out)->chunks();
+ ReconstructChunksWithoutNulls(&chunks);
+ *out = std::make_shared<ChunkedArray>(std::move(chunks),
logical_value_type);
+ }
return Status::OK();
}
Status TransferBinary(RecordReader* reader, MemoryPool* pool,
- const std::shared_ptr<DataType>& logical_value_type,
+ const std::shared_ptr<Field>& logical_type_field,
std::shared_ptr<ChunkedArray>* out) {
if (reader->read_dictionary()) {
return TransferDictionary(
- reader, ::arrow::dictionary(::arrow::int32(), logical_value_type),
out);
+ reader, ::arrow::dictionary(::arrow::int32(),
logical_type_field->type()),
+ logical_type_field->nullable(), out);
}
::arrow::compute::ExecContext ctx(pool);
::arrow::compute::CastOptions cast_options;
@@ -434,14 +486,18 @@ Status TransferBinary(RecordReader* reader, MemoryPool*
pool,
DCHECK(binary_reader);
auto chunks = binary_reader->GetBuilderChunks();
for (auto& chunk : chunks) {
- if (!chunk->type()->Equals(*logical_value_type)) {
+ if (!chunk->type()->Equals(*logical_type_field->type())) {
// XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets
// will be lost because they are first created as int32 and then cast to
int64.
ARROW_ASSIGN_OR_RAISE(
- chunk, ::arrow::compute::Cast(*chunk, logical_value_type,
cast_options, &ctx));
+ chunk,
+ ::arrow::compute::Cast(*chunk, logical_type_field->type(),
cast_options, &ctx));
}
}
- *out = std::make_shared<ChunkedArray>(chunks, logical_value_type);
+ if (!logical_type_field->nullable()) {
+ ReconstructChunksWithoutNulls(&chunks);
+ }
+ *out = std::make_shared<ChunkedArray>(std::move(chunks),
logical_type_field->type());
return Status::OK();
}
@@ -586,7 +642,7 @@ template <
typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType,
Int32Type>::value ||
std::is_same<ParquetIntegerType,
Int64Type>::value>>
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
- const std::shared_ptr<DataType>& type,
Datum* out) {
+ const std::shared_ptr<Field>& field,
Datum* out) {
// Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
// specifically distinguish between decimal byte widths.
// Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings
never
@@ -595,7 +651,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader,
MemoryPool* pool,
// way an integer column could be construed as Decimal256 is if an arrow
// schema was stored as metadata in the file indicating the column was
// Decimal256. The current Arrow-Parquet C++ bindings will never do this.
- DCHECK(type->id() == ::arrow::Type::DECIMAL128);
+ DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128);
const int64_t length = reader->values_written();
@@ -606,7 +662,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader,
MemoryPool* pool,
const auto values = reinterpret_cast<const ElementType*>(reader->values());
- const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type);
+ const auto& decimal_type = checked_cast<const
::arrow::DecimalType&>(*field->type());
const int64_t type_length = decimal_type.byte_width();
ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length *
type_length, pool));
@@ -622,12 +678,12 @@ static Status DecimalIntegerTransfer(RecordReader*
reader, MemoryPool* pool,
decimal.ToBytes(out_ptr);
}
- if (reader->nullable_values()) {
+ if (reader->nullable_values() && field->nullable()) {
std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
- *out = std::make_shared<Decimal128Array>(type, length, std::move(data),
is_valid,
- reader->null_count());
+ *out = std::make_shared<Decimal128Array>(field->type(), length,
std::move(data),
+ is_valid, reader->null_count());
} else {
- *out = std::make_shared<Decimal128Array>(type, length, std::move(data));
+ *out = std::make_shared<Decimal128Array>(field->type(), length,
std::move(data));
}
return Status::OK();
}
@@ -640,43 +696,47 @@ static Status DecimalIntegerTransfer(RecordReader*
reader, MemoryPool* pool,
/// representing the high and low bits of each decimal value.
template <typename DecimalArrayType, typename ParquetType>
Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
- const std::shared_ptr<DataType>& type, Datum* out) {
+ const std::shared_ptr<Field>& field, Datum* out) {
auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
DCHECK(binary_reader);
::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
for (size_t i = 0; i < chunks.size(); ++i) {
std::shared_ptr<Array> chunk_as_decimal;
auto fn = &DecimalConverter<DecimalArrayType,
ParquetType>::ConvertToDecimal;
- RETURN_NOT_OK(fn(*chunks[i], type, pool, &chunk_as_decimal));
+ RETURN_NOT_OK(fn(*chunks[i], field->type(), pool, &chunk_as_decimal));
// Replace the chunk, which will hopefully also free memory as we go
chunks[i] = chunk_as_decimal;
}
- *out = std::make_shared<ChunkedArray>(chunks, type);
+ if (!field->nullable()) {
+ ReconstructChunksWithoutNulls(&chunks);
+ }
+ *out = std::make_shared<ChunkedArray>(chunks, field->type());
return Status::OK();
}
} // namespace
-#define TRANSFER_INT32(ENUM, ArrowType)
\
- case ::arrow::Type::ENUM: {
\
- Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type,
&result); \
- RETURN_NOT_OK(s);
\
+#define TRANSFER_INT32(ENUM, ArrowType)
\
+ case ::arrow::Type::ENUM: {
\
+ Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_field,
&result); \
+ RETURN_NOT_OK(s);
\
} break;
-#define TRANSFER_INT64(ENUM, ArrowType)
\
- case ::arrow::Type::ENUM: {
\
- Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type,
&result); \
- RETURN_NOT_OK(s);
\
+#define TRANSFER_INT64(ENUM, ArrowType)
\
+ case ::arrow::Type::ENUM: {
\
+ Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_field,
&result); \
+ RETURN_NOT_OK(s);
\
} break;
-Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType>
value_type,
+Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>&
value_field,
const ColumnDescriptor* descr, MemoryPool* pool,
std::shared_ptr<ChunkedArray>* out) {
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
- switch (value_type->id()) {
+ switch (value_field->type()->id()) {
case ::arrow::Type::DICTIONARY: {
- RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
+ RETURN_NOT_OK(TransferDictionary(reader, value_field->type(),
+ value_field->nullable(),
&chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::NA: {
@@ -687,10 +747,10 @@ Status TransferColumnData(RecordReader* reader,
std::shared_ptr<DataType> value_
case ::arrow::Type::INT64:
case ::arrow::Type::FLOAT:
case ::arrow::Type::DOUBLE:
- result = TransferZeroCopy(reader, value_type);
+ result = TransferZeroCopy(reader, value_field);
break;
case ::arrow::Type::BOOL:
- RETURN_NOT_OK(TransferBool(reader, pool, &result));
+ RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool,
&result));
break;
TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
TRANSFER_INT32(INT8, ::arrow::Int8Type);
@@ -703,33 +763,33 @@ Status TransferColumnData(RecordReader* reader,
std::shared_ptr<DataType> value_
TRANSFER_INT64(TIME64, ::arrow::Time64Type);
TRANSFER_INT64(DURATION, ::arrow::DurationType);
case ::arrow::Type::DATE64:
- RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
+ RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result));
break;
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::BINARY:
case ::arrow::Type::STRING:
case ::arrow::Type::LARGE_BINARY:
case ::arrow::Type::LARGE_STRING: {
- RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
+ RETURN_NOT_OK(TransferBinary(reader, pool, value_field,
&chunked_result));
result = chunked_result;
} break;
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Int32Type>;
- RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+ RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Int64Type>;
- RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+ RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
- RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+ RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
- RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+ RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
@@ -741,11 +801,11 @@ Status TransferColumnData(RecordReader* reader,
std::shared_ptr<DataType> value_
switch (descr->physical_type()) {
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
- RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+ RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
- RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+ RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
@@ -755,16 +815,16 @@ Status TransferColumnData(RecordReader* reader,
std::shared_ptr<DataType> value_
case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
- checked_cast<::arrow::TimestampType&>(*value_type);
+ checked_cast<::arrow::TimestampType&>(*value_field->type());
if (descr->physical_type() == ::parquet::Type::INT96) {
RETURN_NOT_OK(
- TransferInt96(reader, pool, value_type, &result,
timestamp_type.unit()));
+ TransferInt96(reader, pool, value_field, &result,
timestamp_type.unit()));
} else {
switch (timestamp_type.unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
- result = TransferZeroCopy(reader, value_type);
+ result = TransferZeroCopy(reader, value_field);
break;
default:
return Status::NotImplemented("TimeUnit not supported");
@@ -773,7 +833,7 @@ Status TransferColumnData(RecordReader* reader,
std::shared_ptr<DataType> value_
} break;
default:
return Status::NotImplemented("No support for reading columns of type ",
- value_type->ToString());
+ value_field->type()->ToString());
}
if (result.kind() == Datum::ARRAY) {
diff --git a/cpp/src/parquet/arrow/reader_internal.h
b/cpp/src/parquet/arrow/reader_internal.h
index ad0b781576..cf9dbb8657 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -99,7 +99,7 @@ using FileColumnIteratorFactory =
std::function<FileColumnIterator*(int, ParquetFileReader*)>;
Status TransferColumnData(::parquet::internal::RecordReader* reader,
- std::shared_ptr<::arrow::DataType> value_type,
+ const std::shared_ptr<::arrow::Field>& value_field,
const ColumnDescriptor* descr, ::arrow::MemoryPool*
pool,
std::shared_ptr<::arrow::ChunkedArray>* out);