This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6b074b84 ARROW-16116: [C++] Handle non-nullable fields when reading 
Parquet
7f6b074b84 is described below

commit 7f6b074b84b1ca519b7c5fc7da318e8d47d44278
Author: David Li <[email protected]>
AuthorDate: Wed May 4 15:19:16 2022 +0200

    ARROW-16116: [C++] Handle non-nullable fields when reading Parquet
    
    Closes #12829 from lidavidm/arrow-16116
    
    Authored-by: David Li <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/testing/random.cc                   |   9 +-
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 204 +++++++++++++++++++++-
 cpp/src/parquet/arrow/reader.cc                   |   4 +-
 cpp/src/parquet/arrow/reader_internal.cc          | 176 +++++++++++++------
 cpp/src/parquet/arrow/reader_internal.h           |   2 +-
 5 files changed, 331 insertions(+), 64 deletions(-)

diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc
index 34733ac16e..f42909a8e2 100644
--- a/cpp/src/arrow/testing/random.cc
+++ b/cpp/src/arrow/testing/random.cc
@@ -795,8 +795,13 @@ std::shared_ptr<Array> RandomArrayGenerator::ArrayOf(const 
Field& field, int64_t
     case Type::type::DATE64: {
       using c_type = typename Date64Type::c_type;
       constexpr c_type kFullDayMillis = 1000 * 60 * 60 * 24;
-      constexpr c_type min_value = std::numeric_limits<c_type>::min() / 
kFullDayMillis;
-      constexpr c_type max_value = std::numeric_limits<c_type>::max() / 
kFullDayMillis;
+      constexpr c_type kDefaultMin = std::numeric_limits<c_type>::min() / 
kFullDayMillis;
+      constexpr c_type kDefaultMax = std::numeric_limits<c_type>::max() / 
kFullDayMillis;
+
+      const c_type min_value =
+          GetMetadata<c_type>(field.metadata().get(), "min", kDefaultMin);
+      const c_type max_value =
+          GetMetadata<c_type>(field.metadata().get(), "max", kDefaultMax);
 
       return *Numeric<Date64Type>(length, min_value, max_value, 
null_probability)
                   ->View(field.type());
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 5abc516937..db8b685fa5 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -49,6 +49,7 @@
 #include "arrow/util/config.h"  // for ARROW_CSV definition
 #include "arrow/util/decimal.h"
 #include "arrow/util/future.h"
+#include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/range.h"
 
@@ -3239,6 +3240,197 @@ TEST(TestArrowWrite, CheckChunkSize) {
                 WriteTable(*table, ::arrow::default_memory_pool(), sink, 
chunk_size));
 }
 
+void DoNestedValidate(const std::shared_ptr<::arrow::DataType>& inner_type,
+                      const std::shared_ptr<::arrow::Field>& outer_field,
+                      const std::shared_ptr<Buffer>& buffer,
+                      const std::shared_ptr<::arrow::Table>& table) {
+  std::unique_ptr<FileReader> reader;
+  FileReaderBuilder reader_builder;
+  ASSERT_OK(reader_builder.Open(std::make_shared<BufferReader>(buffer)));
+  ASSERT_OK(reader_builder.Build(&reader));
+  ARROW_SCOPED_TRACE("Parquet schema: ",
+                     
reader->parquet_reader()->metadata()->schema()->ToString());
+  std::shared_ptr<Table> result;
+  ASSERT_OK_NO_THROW(reader->ReadTable(&result));
+
+  if (inner_type->id() == ::arrow::Type::DATE64 ||
+      inner_type->id() == ::arrow::Type::TIMESTAMP ||
+      inner_type->Equals(*::arrow::time32(::arrow::TimeUnit::SECOND))) {
+    // Encoding is different when written out, cast back
+    ASSERT_OK_AND_ASSIGN(auto casted_array,
+                         ::arrow::compute::Cast(result->column(0), 
outer_field->type()));
+    result = ::arrow::Table::Make(::arrow::schema({outer_field}),
+                                  {casted_array.chunked_array()});
+  }
+
+  ASSERT_OK(result->ValidateFull());
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false));
+  // Ensure inner array has no nulls
+  for (const auto& chunk : result->column(0)->chunks()) {
+    const auto& arr = checked_cast<const ::arrow::StructArray&>(*chunk);
+    const auto inner_arr = arr.field(0);
+    ASSERT_EQ(inner_arr->null_count(), 0) << inner_arr->ToString();
+  }
+}
+
+void DoNestedRequiredRoundtrip(
+    const std::shared_ptr<::arrow::DataType>& inner_type,
+    const std::shared_ptr<WriterProperties>& writer_properties,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties) {
+  // Test ARROW-15961/ARROW-16116
+  ARROW_SCOPED_TRACE("Type: ", inner_type->ToString());
+  std::shared_ptr<::arrow::KeyValueMetadata> metadata;
+  if (inner_type->id() != ::arrow::Type::DICTIONARY) {
+    metadata = ::arrow::key_value_metadata({{"min", "0"}, {"max", "127"}});
+  }
+  auto inner_field =
+      ::arrow::field("inner", inner_type, /*nullable=*/false, 
std::move(metadata));
+  auto type = ::arrow::struct_({inner_field});
+  auto field = ::arrow::field("outer", type, /*nullable=*/true);
+
+  auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42);
+  auto inner = gen.ArrayOf(*inner_field, /*size=*/4);
+  ASSERT_EQ(inner->null_count(), 0) << inner->ToString();
+
+  ::arrow::TypedBufferBuilder<bool> bitmap_builder;
+  ASSERT_OK(bitmap_builder.Append(2, false));
+  ASSERT_OK(bitmap_builder.Append(2, true));
+  ASSERT_OK_AND_ASSIGN(auto null_bitmap, bitmap_builder.Finish());
+  ASSERT_OK_AND_ASSIGN(auto array,
+                       ::arrow::StructArray::Make({inner}, {inner_field}, 
null_bitmap));
+  auto table = ::arrow::Table::Make(::arrow::schema({field}), {array});
+  ASSERT_OK(table->ValidateFull());
+
+  auto sink = CreateOutputStream();
+  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+                                /*row_group_size=*/4, writer_properties,
+                                arrow_writer_properties));
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+  ASSERT_NO_FATAL_FAILURE(DoNestedValidate(inner_type, field, buffer, table));
+}
+
+TEST(ArrowReadWrite, NestedRequiredOuterOptional) {
+  std::vector<std::shared_ptr<DataType>> types = ::arrow::PrimitiveTypes();
+  types.insert(types.end(), ::arrow::TemporalTypes().begin(),
+               ::arrow::TemporalTypes().end());
+  types.push_back(::arrow::duration(::arrow::TimeUnit::SECOND));
+  types.push_back(::arrow::duration(::arrow::TimeUnit::MILLI));
+  types.push_back(::arrow::duration(::arrow::TimeUnit::MICRO));
+  types.push_back(::arrow::duration(::arrow::TimeUnit::NANO));
+  types.push_back(::arrow::decimal128(3, 2));
+  types.push_back(::arrow::decimal256(3, 2));
+  types.push_back(::arrow::fixed_size_binary(4));
+  // Note large variants of types appear to get converted back to regular on 
read
+  types.push_back(::arrow::dictionary(::arrow::int32(), ::arrow::binary()));
+  types.push_back(::arrow::dictionary(::arrow::int32(), ::arrow::utf8()));
+
+  for (const auto& inner_type : types) {
+    if (inner_type->id() == ::arrow::Type::NA) continue;
+
+    auto writer_props = WriterProperties::Builder();
+    auto arrow_writer_props = ArrowWriterProperties::Builder();
+    arrow_writer_props.store_schema();
+    if (inner_type->id() == ::arrow::Type::UINT32) {
+      writer_props.version(ParquetVersion::PARQUET_2_4);
+    } else if (inner_type->id() == ::arrow::Type::TIMESTAMP) {
+      // By default ns is coerced to us, override that
+      ::arrow::TimeUnit::type unit =
+          checked_cast<const ::arrow::TimestampType&>(*inner_type).unit();
+      if (unit == ::arrow::TimeUnit::NANO) {
+        writer_props.version(ParquetVersion::PARQUET_2_6);
+        arrow_writer_props.coerce_timestamps(unit);
+      }
+    }
+
+    ASSERT_NO_FATAL_FAILURE(DoNestedRequiredRoundtrip(inner_type, 
writer_props.build(),
+                                                      
arrow_writer_props.build()));
+
+    if (inner_type->id() == ::arrow::Type::TIMESTAMP) {
+      ARROW_SCOPED_TRACE("enable_deprecated_int96_timestamps = true");
+      arrow_writer_props.enable_deprecated_int96_timestamps();
+      ASSERT_NO_FATAL_FAILURE(DoNestedRequiredRoundtrip(inner_type, 
writer_props.build(),
+                                                        
arrow_writer_props.build()));
+    }
+  }
+  // NOTE: read_dictionary option only applies to top-level columns,
+  // so we don't address that path here
+}
+
+TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) {
+  // Manually construct files to test decimals encoded as variable-length byte 
array
+  ::arrow::TypedBufferBuilder<bool> bitmap_builder;
+  ASSERT_OK(bitmap_builder.Append(2, false));
+  ASSERT_OK(bitmap_builder.Append(2, true));
+  ASSERT_OK_AND_ASSIGN(auto null_bitmap, bitmap_builder.Finish());
+
+  const std::vector<int16_t> def_levels = {0, 0, 1, 1};
+  const std::vector<ByteArray> byte_arrays = {
+      ByteArray("\x01\xe2\x40"),  // 123456
+      ByteArray("\x0f\x12\x06"),  // 987654
+  };
+  const std::vector<int32_t> int32_values = {123456, 987654};
+  const std::vector<int64_t> int64_values = {123456, 987654};
+
+  const auto inner_type = ::arrow::decimal128(6, 3);
+  auto inner_field = ::arrow::field("inner", inner_type, /*nullable=*/false);
+  auto type = ::arrow::struct_({inner_field});
+  auto field = ::arrow::field("outer", type, /*nullable=*/true);
+  auto inner =
+      ArrayFromJSON(inner_type, R"(["000.000", "000.000", "123.456", 
"987.654"])");
+  ASSERT_OK_AND_ASSIGN(auto array,
+                       ::arrow::StructArray::Make({inner}, {inner_field}, 
null_bitmap));
+  auto table = ::arrow::Table::Make(::arrow::schema({field}), {array});
+
+  for (const auto& encoding : {Type::BYTE_ARRAY, Type::INT32, Type::INT64}) {
+    // Manually write out file based on encoding type
+    ARROW_SCOPED_TRACE("Encoding decimals as ", encoding);
+    auto parquet_schema = GroupNode::Make(
+        "schema", Repetition::REQUIRED,
+        {GroupNode::Make("outer", Repetition::OPTIONAL,
+                         {
+                             PrimitiveNode::Make("inner", Repetition::REQUIRED,
+                                                 LogicalType::Decimal(6, 3), 
encoding),
+                         })});
+
+    auto sink = CreateOutputStream();
+    auto file_writer =
+        ParquetFileWriter::Open(sink, 
checked_pointer_cast<GroupNode>(parquet_schema));
+    auto column_writer = file_writer->AppendRowGroup()->NextColumn();
+    ARROW_SCOPED_TRACE("Column descriptor: ", 
column_writer->descr()->ToString());
+
+    switch (encoding) {
+      case Type::BYTE_ARRAY: {
+        auto typed_writer =
+            checked_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
+        typed_writer->WriteBatch(4, def_levels.data(), /*rep_levels=*/nullptr,
+                                 byte_arrays.data());
+        break;
+      }
+      case Type::INT32: {
+        auto typed_writer = checked_cast<Int32Writer*>(column_writer);
+        typed_writer->WriteBatch(4, def_levels.data(), /*rep_levels=*/nullptr,
+                                 int32_values.data());
+        break;
+      }
+      case Type::INT64: {
+        auto typed_writer = checked_cast<Int64Writer*>(column_writer);
+        typed_writer->WriteBatch(4, def_levels.data(), /*rep_levels=*/nullptr,
+                                 int64_values.data());
+        break;
+      }
+      default:
+        FAIL() << "Invalid encoding";
+        return;
+    }
+
+    column_writer->Close();
+    file_writer->Close();
+
+    ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+    ASSERT_NO_FATAL_FAILURE(DoNestedValidate(inner_type, field, buffer, 
table));
+  }
+}
+
 class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> 
{
  protected:
   // make it *3 to make it easily divisible by 3
@@ -3516,7 +3708,17 @@ TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
   // validate struct array
   ASSERT_NO_FATAL_FAILURE(ValidateArray(*struct_field_array, 
NUM_SIMPLE_TEST_ROWS / 3));
   // validate leaf1
-  ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf1_array, 
NUM_SIMPLE_TEST_ROWS / 3));
+  ASSERT_NO_FATAL_FAILURE(ValidateArray(*leaf1_array, /*expected_nulls=*/0));
+  // Validate values manually here. The child array is non-nullable,
+  // but Parquet does not store null values, so we need to account for
+  // the struct's validity bitmap.
+  {
+    int j = 0;
+    for (int i = 0; i < values_array_->length(); i++) {
+      if (struct_field_array->IsNull(i)) continue;
+      ASSERT_EQ(leaf1_array->Value(i), values_array_->Value(j++));
+    }
+  }
   // validate leaf2
   ASSERT_NO_FATAL_FAILURE(
       ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2 / 3));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 4c298a9ce4..03470d4e8f 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -482,8 +482,8 @@ class LeafReader : public ColumnReaderImpl {
         NextRowGroup();
       }
     }
-    RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), 
descr_,
-                                     ctx_->pool, &out_));
+    RETURN_NOT_OK(
+        TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, 
&out_));
     return Status::OK();
     END_PARQUET_CATCH_EXCEPTIONS
   }
diff --git a/cpp/src/parquet/arrow/reader_internal.cc 
b/cpp/src/parquet/arrow/reader_internal.cc
index 0981a73c44..d8aad0d0f2 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -303,9 +303,23 @@ Status StatisticsAsScalars(const Statistics& statistics,
 
 namespace {
 
+/// Drop the validity buffer from each chunk.
+///
+/// Used when reading a non-nullable field.
+void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) {
+  for (size_t i = 0; i < chunks->size(); i++) {
+    if ((*chunks)[i]->data()->buffers[0]) {
+      std::shared_ptr<::arrow::ArrayData> data = (*chunks)[i]->data();
+      data->null_count = 0;
+      data->buffers[0] = nullptr;
+      (*chunks)[i] = MakeArray(data);
+    }
+  }
+}
+
 template <typename ArrowType, typename ParquetType>
 Status TransferInt(RecordReader* reader, MemoryPool* pool,
-                   const std::shared_ptr<DataType>& type, Datum* out) {
+                   const std::shared_ptr<Field>& field, Datum* out) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
   int64_t length = reader->values_written();
@@ -315,21 +329,35 @@ Status TransferInt(RecordReader* reader, MemoryPool* pool,
   auto values = reinterpret_cast<const ParquetCType*>(reader->values());
   auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
   std::copy(values, values + length, out_ptr);
-  *out = std::make_shared<ArrayType<ArrowType>>(
-      type, length, std::move(data), reader->ReleaseIsValid(), 
reader->null_count());
+  if (field->nullable()) {
+    *out = std::make_shared<ArrayType<ArrowType>>(field->type(), length, 
std::move(data),
+                                                  reader->ReleaseIsValid(),
+                                                  reader->null_count());
+  } else {
+    *out =
+        std::make_shared<ArrayType<ArrowType>>(field->type(), length, 
std::move(data),
+                                               /*null_bitmap=*/nullptr, 
/*null_count=*/0);
+  }
   return Status::OK();
 }
 
 std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
-                                        const std::shared_ptr<DataType>& type) 
{
-  std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
-                                                  reader->ReleaseValues()};
-  auto data = std::make_shared<::arrow::ArrayData>(type, 
reader->values_written(),
-                                                   buffers, 
reader->null_count());
+                                        const std::shared_ptr<Field>& field) {
+  std::shared_ptr<::arrow::ArrayData> data;
+  if (field->nullable()) {
+    std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
+                                                    reader->ReleaseValues()};
+    data = std::make_shared<::arrow::ArrayData>(field->type(), 
reader->values_written(),
+                                                std::move(buffers), 
reader->null_count());
+  } else {
+    std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, 
reader->ReleaseValues()};
+    data = std::make_shared<::arrow::ArrayData>(field->type(), 
reader->values_written(),
+                                                std::move(buffers), 
/*null_count=*/0);
+  }
   return ::arrow::MakeArray(data);
 }
 
-Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
+Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, 
Datum* out) {
   int64_t length = reader->values_written();
 
   const int64_t buffer_size = bit_util::BytesForBits(length);
@@ -346,13 +374,18 @@ Status TransferBool(RecordReader* reader, MemoryPool* 
pool, Datum* out) {
     }
   }
 
-  *out = std::make_shared<BooleanArray>(length, std::move(data), 
reader->ReleaseIsValid(),
-                                        reader->null_count());
+  if (nullable) {
+    *out = std::make_shared<BooleanArray>(length, std::move(data),
+                                          reader->ReleaseIsValid(), 
reader->null_count());
+  } else {
+    *out = std::make_shared<BooleanArray>(length, std::move(data),
+                                          /*null_bitmap=*/nullptr, 
/*null_count=*/0);
+  }
   return Status::OK();
 }
 
 Status TransferInt96(RecordReader* reader, MemoryPool* pool,
-                     const std::shared_ptr<DataType>& type, Datum* out,
+                     const std::shared_ptr<Field>& field, Datum* out,
                      const ::arrow::TimeUnit::type int96_arrow_time_unit) {
   int64_t length = reader->values_written();
   auto values = reinterpret_cast<const Int96*>(reader->values());
@@ -381,13 +414,19 @@ Status TransferInt96(RecordReader* reader, MemoryPool* 
pool,
       }
     }
   }
-  *out = std::make_shared<TimestampArray>(type, length, std::move(data),
-                                          reader->ReleaseIsValid(), 
reader->null_count());
+  if (field->nullable()) {
+    *out =
+        std::make_shared<TimestampArray>(field->type(), length, 
std::move(data),
+                                         reader->ReleaseIsValid(), 
reader->null_count());
+  } else {
+    *out = std::make_shared<TimestampArray>(field->type(), length, 
std::move(data),
+                                            /*null_bitmap=*/nullptr, 
/*null_count=*/0);
+  }
   return Status::OK();
 }
 
 Status TransferDate64(RecordReader* reader, MemoryPool* pool,
-                      const std::shared_ptr<DataType>& type, Datum* out) {
+                      const std::shared_ptr<Field>& field, Datum* out) {
   int64_t length = reader->values_written();
   auto values = reinterpret_cast<const int32_t*>(reader->values());
 
@@ -399,8 +438,15 @@ Status TransferDate64(RecordReader* reader, MemoryPool* 
pool,
     *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
   }
 
-  *out = std::make_shared<::arrow::Date64Array>(
-      type, length, std::move(data), reader->ReleaseIsValid(), 
reader->null_count());
+  if (field->nullable()) {
+    *out = std::make_shared<::arrow::Date64Array>(field->type(), length, 
std::move(data),
+                                                  reader->ReleaseIsValid(),
+                                                  reader->null_count());
+  } else {
+    *out =
+        std::make_shared<::arrow::Date64Array>(field->type(), length, 
std::move(data),
+                                               /*null_bitmap=*/nullptr, 
/*null_count=*/0);
+  }
   return Status::OK();
 }
 
@@ -409,22 +455,28 @@ Status TransferDate64(RecordReader* reader, MemoryPool* 
pool,
 
 Status TransferDictionary(RecordReader* reader,
                           const std::shared_ptr<DataType>& logical_value_type,
-                          std::shared_ptr<ChunkedArray>* out) {
+                          bool nullable, std::shared_ptr<ChunkedArray>* out) {
   auto dict_reader = dynamic_cast<DictionaryRecordReader*>(reader);
   DCHECK(dict_reader);
   *out = dict_reader->GetResult();
   if (!logical_value_type->Equals(*(*out)->type())) {
     ARROW_ASSIGN_OR_RAISE(*out, (*out)->View(logical_value_type));
   }
+  if (!nullable) {
+    ::arrow::ArrayVector chunks = (*out)->chunks();
+    ReconstructChunksWithoutNulls(&chunks);
+    *out = std::make_shared<ChunkedArray>(std::move(chunks), 
logical_value_type);
+  }
   return Status::OK();
 }
 
 Status TransferBinary(RecordReader* reader, MemoryPool* pool,
-                      const std::shared_ptr<DataType>& logical_value_type,
+                      const std::shared_ptr<Field>& logical_type_field,
                       std::shared_ptr<ChunkedArray>* out) {
   if (reader->read_dictionary()) {
     return TransferDictionary(
-        reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), 
out);
+        reader, ::arrow::dictionary(::arrow::int32(), 
logical_type_field->type()),
+        logical_type_field->nullable(), out);
   }
   ::arrow::compute::ExecContext ctx(pool);
   ::arrow::compute::CastOptions cast_options;
@@ -434,14 +486,18 @@ Status TransferBinary(RecordReader* reader, MemoryPool* 
pool,
   DCHECK(binary_reader);
   auto chunks = binary_reader->GetBuilderChunks();
   for (auto& chunk : chunks) {
-    if (!chunk->type()->Equals(*logical_value_type)) {
+    if (!chunk->type()->Equals(*logical_type_field->type())) {
       // XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets
       // will be lost because they are first created as int32 and then cast to 
int64.
       ARROW_ASSIGN_OR_RAISE(
-          chunk, ::arrow::compute::Cast(*chunk, logical_value_type, 
cast_options, &ctx));
+          chunk,
+          ::arrow::compute::Cast(*chunk, logical_type_field->type(), 
cast_options, &ctx));
     }
   }
-  *out = std::make_shared<ChunkedArray>(chunks, logical_value_type);
+  if (!logical_type_field->nullable()) {
+    ReconstructChunksWithoutNulls(&chunks);
+  }
+  *out = std::make_shared<ChunkedArray>(std::move(chunks), 
logical_type_field->type());
   return Status::OK();
 }
 
@@ -586,7 +642,7 @@ template <
     typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, 
Int32Type>::value ||
                                     std::is_same<ParquetIntegerType, 
Int64Type>::value>>
 static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
-                                     const std::shared_ptr<DataType>& type, 
Datum* out) {
+                                     const std::shared_ptr<Field>& field, 
Datum* out) {
   // Decimal128 and Decimal256 are only Arrow constructs.  Parquet does not
   // specifically distinguish between decimal byte widths.
   // Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings 
never
@@ -595,7 +651,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader, 
MemoryPool* pool,
   // way an integer column could be construed as Decimal256 is if an arrow
   // schema was stored as metadata in the file indicating the column was
   // Decimal256. The current Arrow-Parquet C++ bindings will never do this.
-  DCHECK(type->id() == ::arrow::Type::DECIMAL128);
+  DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128);
 
   const int64_t length = reader->values_written();
 
@@ -606,7 +662,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader, 
MemoryPool* pool,
 
   const auto values = reinterpret_cast<const ElementType*>(reader->values());
 
-  const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*type);
+  const auto& decimal_type = checked_cast<const 
::arrow::DecimalType&>(*field->type());
   const int64_t type_length = decimal_type.byte_width();
 
   ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * 
type_length, pool));
@@ -622,12 +678,12 @@ static Status DecimalIntegerTransfer(RecordReader* 
reader, MemoryPool* pool,
     decimal.ToBytes(out_ptr);
   }
 
-  if (reader->nullable_values()) {
+  if (reader->nullable_values() && field->nullable()) {
     std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-    *out = std::make_shared<Decimal128Array>(type, length, std::move(data), 
is_valid,
-                                             reader->null_count());
+    *out = std::make_shared<Decimal128Array>(field->type(), length, 
std::move(data),
+                                             is_valid, reader->null_count());
   } else {
-    *out = std::make_shared<Decimal128Array>(type, length, std::move(data));
+    *out = std::make_shared<Decimal128Array>(field->type(), length, 
std::move(data));
   }
   return Status::OK();
 }
@@ -640,43 +696,47 @@ static Status DecimalIntegerTransfer(RecordReader* 
reader, MemoryPool* pool,
 ///    representing the high and low bits of each decimal value.
 template <typename DecimalArrayType, typename ParquetType>
 Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
-                       const std::shared_ptr<DataType>& type, Datum* out) {
+                       const std::shared_ptr<Field>& field, Datum* out) {
   auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
   DCHECK(binary_reader);
   ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
   for (size_t i = 0; i < chunks.size(); ++i) {
     std::shared_ptr<Array> chunk_as_decimal;
     auto fn = &DecimalConverter<DecimalArrayType, 
ParquetType>::ConvertToDecimal;
-    RETURN_NOT_OK(fn(*chunks[i], type, pool, &chunk_as_decimal));
+    RETURN_NOT_OK(fn(*chunks[i], field->type(), pool, &chunk_as_decimal));
     // Replace the chunk, which will hopefully also free memory as we go
     chunks[i] = chunk_as_decimal;
   }
-  *out = std::make_shared<ChunkedArray>(chunks, type);
+  if (!field->nullable()) {
+    ReconstructChunksWithoutNulls(&chunks);
+  }
+  *out = std::make_shared<ChunkedArray>(chunks, field->type());
   return Status::OK();
 }
 
 }  // namespace
 
-#define TRANSFER_INT32(ENUM, ArrowType)                                        
      \
-  case ::arrow::Type::ENUM: {                                                  
      \
-    Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, 
&result); \
-    RETURN_NOT_OK(s);                                                          
      \
+#define TRANSFER_INT32(ENUM, ArrowType)                                        
       \
+  case ::arrow::Type::ENUM: {                                                  
       \
+    Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_field, 
&result); \
+    RETURN_NOT_OK(s);                                                          
       \
   } break;
 
-#define TRANSFER_INT64(ENUM, ArrowType)                                        
      \
-  case ::arrow::Type::ENUM: {                                                  
      \
-    Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, 
&result); \
-    RETURN_NOT_OK(s);                                                          
      \
+#define TRANSFER_INT64(ENUM, ArrowType)                                        
       \
+  case ::arrow::Type::ENUM: {                                                  
       \
+    Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_field, 
&result); \
+    RETURN_NOT_OK(s);                                                          
       \
   } break;
 
-Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> 
value_type,
+Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& 
value_field,
                           const ColumnDescriptor* descr, MemoryPool* pool,
                           std::shared_ptr<ChunkedArray>* out) {
   Datum result;
   std::shared_ptr<ChunkedArray> chunked_result;
-  switch (value_type->id()) {
+  switch (value_field->type()->id()) {
     case ::arrow::Type::DICTIONARY: {
-      RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
+      RETURN_NOT_OK(TransferDictionary(reader, value_field->type(),
+                                       value_field->nullable(), 
&chunked_result));
       result = chunked_result;
     } break;
     case ::arrow::Type::NA: {
@@ -687,10 +747,10 @@ Status TransferColumnData(RecordReader* reader, 
std::shared_ptr<DataType> value_
     case ::arrow::Type::INT64:
     case ::arrow::Type::FLOAT:
     case ::arrow::Type::DOUBLE:
-      result = TransferZeroCopy(reader, value_type);
+      result = TransferZeroCopy(reader, value_field);
       break;
     case ::arrow::Type::BOOL:
-      RETURN_NOT_OK(TransferBool(reader, pool, &result));
+      RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool, 
&result));
       break;
       TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
       TRANSFER_INT32(INT8, ::arrow::Int8Type);
@@ -703,33 +763,33 @@ Status TransferColumnData(RecordReader* reader, 
std::shared_ptr<DataType> value_
       TRANSFER_INT64(TIME64, ::arrow::Time64Type);
       TRANSFER_INT64(DURATION, ::arrow::DurationType);
     case ::arrow::Type::DATE64:
-      RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
+      RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result));
       break;
     case ::arrow::Type::FIXED_SIZE_BINARY:
     case ::arrow::Type::BINARY:
     case ::arrow::Type::STRING:
     case ::arrow::Type::LARGE_BINARY:
     case ::arrow::Type::LARGE_STRING: {
-      RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
+      RETURN_NOT_OK(TransferBinary(reader, pool, value_field, 
&chunked_result));
       result = chunked_result;
     } break;
     case ::arrow::Type::DECIMAL128: {
       switch (descr->physical_type()) {
         case ::parquet::Type::INT32: {
           auto fn = DecimalIntegerTransfer<Int32Type>;
-          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
         } break;
         case ::parquet::Type::INT64: {
           auto fn = &DecimalIntegerTransfer<Int64Type>;
-          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
         } break;
         case ::parquet::Type::BYTE_ARRAY: {
           auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
-          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
         } break;
         case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
           auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
-          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
         } break;
         default:
           return Status::Invalid(
@@ -741,11 +801,11 @@ Status TransferColumnData(RecordReader* reader, 
std::shared_ptr<DataType> value_
       switch (descr->physical_type()) {
         case ::parquet::Type::BYTE_ARRAY: {
           auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
-          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
         } break;
         case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
           auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
-          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+          RETURN_NOT_OK(fn(reader, pool, value_field, &result));
         } break;
         default:
           return Status::Invalid(
@@ -755,16 +815,16 @@ Status TransferColumnData(RecordReader* reader, 
std::shared_ptr<DataType> value_
 
     case ::arrow::Type::TIMESTAMP: {
       const ::arrow::TimestampType& timestamp_type =
-          checked_cast<::arrow::TimestampType&>(*value_type);
+          checked_cast<::arrow::TimestampType&>(*value_field->type());
       if (descr->physical_type() == ::parquet::Type::INT96) {
         RETURN_NOT_OK(
-            TransferInt96(reader, pool, value_type, &result, 
timestamp_type.unit()));
+            TransferInt96(reader, pool, value_field, &result, 
timestamp_type.unit()));
       } else {
         switch (timestamp_type.unit()) {
           case ::arrow::TimeUnit::MILLI:
           case ::arrow::TimeUnit::MICRO:
           case ::arrow::TimeUnit::NANO:
-            result = TransferZeroCopy(reader, value_type);
+            result = TransferZeroCopy(reader, value_field);
             break;
           default:
             return Status::NotImplemented("TimeUnit not supported");
@@ -773,7 +833,7 @@ Status TransferColumnData(RecordReader* reader, 
std::shared_ptr<DataType> value_
     } break;
     default:
       return Status::NotImplemented("No support for reading columns of type ",
-                                    value_type->ToString());
+                                    value_field->type()->ToString());
   }
 
   if (result.kind() == Datum::ARRAY) {
diff --git a/cpp/src/parquet/arrow/reader_internal.h 
b/cpp/src/parquet/arrow/reader_internal.h
index ad0b781576..cf9dbb8657 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -99,7 +99,7 @@ using FileColumnIteratorFactory =
     std::function<FileColumnIterator*(int, ParquetFileReader*)>;
 
 Status TransferColumnData(::parquet::internal::RecordReader* reader,
-                          std::shared_ptr<::arrow::DataType> value_type,
+                          const std::shared_ptr<::arrow::Field>& value_field,
                           const ColumnDescriptor* descr, ::arrow::MemoryPool* 
pool,
                           std::shared_ptr<::arrow::ChunkedArray>* out);
 

Reply via email to