emkornfield commented on a change in pull request #8177:
URL: https://github.com/apache/arrow/pull/8177#discussion_r489138986



##########
File path: cpp/src/parquet/level_conversion_inc.h
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "parquet/level_conversion.h"
+
+#include <algorithm>
+#include <limits>
+
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/simd.h"
+#include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+namespace parquet {
+namespace internal {
+#ifndef PARQUET_IMPL_NAMESPACE
+#error "PARQUET_IMPL_NAMESPACE must be defined"
+#endif
+namespace PARQUET_IMPL_NAMESPACE {
+
+/// Algorithm to simulate pext using BitRunReader for cases where all bits
+/// not set or set.
+uint64_t RunBasedExtractMixed(uint64_t bitmap, uint64_t select_bitmap) {
+  bitmap = arrow::BitUtil::FromLittleEndian(bitmap);
+  uint64_t new_bitmap = 0;
+  ::arrow::internal::BitRunReader 
selection(reinterpret_cast<uint8_t*>(&select_bitmap),
+                                            /*start_offset=*/0, /*length=*/64);
+  ::arrow::internal::BitRun run = selection.NextRun();
+  int64_t selected_bits = 0;
+  while (run.length != 0) {
+    if (run.set) {
+      new_bitmap |= (bitmap & 
::arrow::BitUtil::LeastSignficantBitMask(run.length))
+                    << selected_bits;
+      selected_bits += run.length;
+    }
+    bitmap = bitmap >> run.length;
+    run = selection.NextRun();
+  }
+  return arrow::BitUtil::ToLittleEndian(new_bitmap);
+}
+
+inline uint64_t RunBasedExtractImpl(uint64_t bitmap, uint64_t select_bitmap) {
+  /// These checks should be inline and are likely to be common cases.
+  if (select_bitmap == ~uint64_t{0}) {
+    return bitmap;
+  } else if (select_bitmap == 0) {
+    return 0;
+  }
+  /// Fallback to the slow method.
+  return RunBasedExtractMixed(bitmap, select_bitmap);
+}
+
+inline uint64_t ExtractBits(uint64_t bitmap, uint64_t select_bitmap) {
+#if defined(ARROW_HAVE_BMI2) && !defined(__MINGW32__)

Review comment:
       done.

##########
File path: cpp/cmake_modules/SetupCxxFlags.cmake
##########
@@ -50,7 +50,7 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
     # skylake-avx512 consists of AVX512F,AVX512BW,AVX512VL,AVX512CD,AVX512DQ
     set(ARROW_AVX512_FLAG "-march=skylake-avx512 -mbmi2")
     # Append the avx2/avx512 subset option also, fix issue ARROW-9877 for 
homebrew-cpp
-    set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2")
+    set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2 -mbmi2")

Review comment:
       OK, I removed.  and place  -mbmi2 specifically only for the parquet 
file.  I think this should be safe because it is guarded via runtime dispatch. 
   
   I might not have been clear, but MSVC doesn't have any way of distinguishing 
these things so if we ever turn on AVX2 by default we have the same issue.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)

Review comment:
       removed.  it is an upper bound. renamed it. and removed comment.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());

Review comment:
       I move this to above to avoid recursively calling things multiple times, 
but i think we should be validating at least for structs (and not validating 
full) since rep/def level information could be inconsistent within them.  It 
felt easier to call validate (and not too expensive) then writing custom logic 
for this.
   
   I'm open to removing them, but it feels like there should be a contract here 
that someplace in this code for an underlying library we validate consistency.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),

Review comment:
       I was thinking there was some issue with with list arrays that always 
required two elements.  I couldn't find the issue though.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 
+#include "arrow/util/bitmap.h"

Review comment:
       nope.

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,205 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
 
+#include "parquet/level_comparison.h"
+#define PARQUET_IMPL_NAMESPACE standard
+#include "parquet/level_conversion_inc.h"
+#undef PARQUET_IMPL_NAMESPACE
+
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t 
max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* 
null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, 
valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                             LevelInfo level_info, ValidityBitmapInputOutput* 
output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    // This indicates that a parent repeated element has zero
+    // length so the def level is not applicable to this column.
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {
+      continue;
+    }
+    if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >=
+                            output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << 
output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
+    if (def_levels[x] >= level_info.def_level) {
       valid_bits_writer.Set();
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        continue;
-      }
     } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
+      valid_bits_writer.Clear();
+      output->null_count += 1;
     }
-
     valid_bits_writer.Next();
   }
   valid_bits_writer.Finish();
-  *values_read = valid_bits_writer.position();
+  output->values_read = valid_bits_writer.position();
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported");
+  }
 }
 #endif
 
-template <bool has_repeated_parent>
-int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t 
batch_size,
-                                      const int16_t required_definition_level,
-                                      
::arrow::internal::FirstTimeBitmapWriter* writer) {
-  CheckLevelRange(def_levels, batch_size, required_definition_level);
-  uint64_t defined_bitmap =
-      internal::GreaterThanBitmap(def_levels, batch_size, 
required_definition_level - 1);
-
-  DCHECK_LE(batch_size, 64);
-  if (has_repeated_parent) {
-#if defined(ARROW_HAVE_BMI2)
-    // This is currently a specialized code path assuming only (nested) lists
-    // present through the leaf (i.e. no structs). Upper level code only calls
-    // this method when the leaf-values are nullable (otherwise no spacing is 
needed),
-    // Because only nested lists exists it is sufficient to know that the field
-    // was either null or included it (i.e. definition level > 
max_definitation_level
-    // -2) If there where structs mixed in, we need to know the def_level of 
the
-    // repeated parent so we can check for def_level > "def level of repeated 
parent".
-    uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, 
batch_size,
-                                                          
required_definition_level - 2);
-    uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap);
-    writer->AppendWord(selected_bits, 
::arrow::BitUtil::PopCount(present_bitmap));
-    return ::arrow::BitUtil::PopCount(selected_bits);
-#else
-    assert(false && "must not execute this without BMI2");
-#endif
-  } else {
-    writer->AppendWord(defined_bitmap, batch_size);
-    return ::arrow::BitUtil::PopCount(defined_bitmap);
+template <typename OffsetType>
+void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* 
rep_levels,
+                            int64_t num_def_levels, LevelInfo level_info,
+                            ValidityBitmapInputOutput* output, OffsetType* 
offsets) {
+  OffsetType* orig_pos = offsets;
+  std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
+  if (output->valid_bits) {
+    valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter(
+        output->valid_bits, output->valid_bits_offset, num_def_levels));
   }
-}
+  for (int x = 0; x < num_def_levels; x++) {
+    // Skip items that belong to empty or null ancestor lists and further 
nested lists.
+    if (def_levels[x] < level_info.repeated_ancestor_def_level ||
+        rep_levels[x] > level_info.rep_level) {
+      continue;
+    }
 
-template <bool has_repeated_parent>
-void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t 
num_def_levels,
-                                  const int16_t required_definition_level,
-                                  int64_t* values_read, int64_t* null_count,
-                                  uint8_t* valid_bits, int64_t 
valid_bits_offset) {
-  constexpr int64_t kBitMaskSize = 64;
-  ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits,
-                                                  
/*start_offset=*/valid_bits_offset,
-                                                  /*length=*/num_def_levels);
-  int64_t set_count = 0;
-  *values_read = 0;
-  while (num_def_levels > kBitMaskSize) {
-    set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-        def_levels, kBitMaskSize, required_definition_level, &writer);
-    def_levels += kBitMaskSize;
-    num_def_levels -= kBitMaskSize;
-  }
-  set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-      def_levels, num_def_levels, required_definition_level, &writer);
+    if (rep_levels[x] == level_info.rep_level) {
+      // A continuation of an existing list.
+      // offsets can be null for structs with repeated children (we don't need 
to know
+      // offsets until we get to the children).
+      if (offsets != nullptr) {
+        if (ARROW_PREDICT_FALSE(*offsets == 
std::numeric_limits<OffsetType>::max())) {
+          throw ParquetException("List index overflow.");
+        }
+        *offsets += 1;
+      }
+    } else {
+      if (ARROW_PREDICT_FALSE(
+              (valid_bits_writer != nullptr &&
+               valid_bits_writer->position() >= 
output->values_read_upper_bound) ||
+              (offsets - orig_pos) >= output->values_read_upper_bound)) {
+        std::stringstream ss;
+        ss << "Definition levels exceeded upper bound: "
+           << output->values_read_upper_bound;
+        throw ParquetException(ss.str());
+      }
 
-  *values_read = writer.position();
-  *null_count += *values_read - set_count;
-  writer.Finish();
+      // current_rep < list rep_level i.e. start of a list (ancestor empty 
lists are
+      // filtered out above).
+      // offsets can be null for structs with repeated children (we don't need 
to know
+      // offsets until we get to the children).
+      if (offsets != nullptr) {
+        ++offsets;
+        // Use cumulative offsets because variable size lists are more common 
then
+        // fixed size lists so it should be cheaper to make these cumulative 
and
+        // subtract when validating fixed size lists.
+        *offsets = *(offsets - 1);
+        if (def_levels[x] >= level_info.def_level) {
+          if (ARROW_PREDICT_FALSE(*offsets == 
std::numeric_limits<OffsetType>::max())) {
+            throw ParquetException("List index overflow.");
+          }
+          *offsets += 1;
+        }
+      }
+
+      if (valid_bits_writer != nullptr) {
+        // the level_info def level for lists reflects element present level.
+        // the prior level distinguishes between empty lists.
+        if (def_levels[x] >= level_info.def_level - 1) {
+          valid_bits_writer->Set();
+        } else {
+          output->null_count++;
+          valid_bits_writer->Clear();
+        }
+        valid_bits_writer->Next();
+      }
+    }
+  }
+  if (valid_bits_writer != nullptr) {
+    valid_bits_writer->Finish();
+  }
+  if (offsets != nullptr) {
+    output->values_read = offsets - orig_pos;
+  } else if (valid_bits_writer != nullptr) {
+    output->values_read = valid_bits_writer->position();
+  }
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported)");
+  }
 }
 
-void DefinitionLevelsToBitmapLittleEndian(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t 
max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* 
null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  if (max_repetition_level > 0) {
-// This is a short term hack to prevent using the pext BMI2 instructions
-// on non-intel platforms where performance is subpar.
-// In the medium term we will hopefully be able to runtime dispatch
-// to use this on intel only platforms that support pext.
-#if defined(ARROW_HAVE_AVX512)
-    // BMI2 is required for efficient bit extraction.
-    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
-        def_levels, num_def_levels, max_definition_level, values_read, 
null_count,
-        valid_bits, valid_bits_offset);
-#else
-    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, 
max_definition_level,
-                                   max_repetition_level, values_read, 
null_count,
-                                   valid_bits, valid_bits_offset);
-#endif  // ARROW_HAVE_BMI2
+}  // namespace
 
+#if defined(ARROW_HAVE_RUNTIME_BMI2)
+// defined in level_conversion_bmi2.cc for dynamic dispatch.
+void DefLevelsToBitmapBmi2WithRepeatedParent(const int16_t* def_levels,
+                                             int64_t num_def_levels, LevelInfo 
level_info,
+                                             ValidityBitmapInputOutput* 
output);
+#endif
+
+void DefLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
+                       LevelInfo level_info, ValidityBitmapInputOutput* 
output) {
+  // It is simpler to rely on rep_level here until PARQUET-1899 is done and 
the code
+  // is deleted in a follow-up release.
+  if (level_info.rep_level > 0) {
+#if defined(ARROW_HAVE_RUNTIME_BMI2)
+    using FunctionType = decltype(&standard::DefLevelsToBitmapSimd<true>);
+    static FunctionType fn =
+        CpuInfo::GetInstance()->HasEfficientBmi2()
+            ? DefLevelsToBitmapBmi2WithRepeatedParent
+            : standard::DefLevelsToBitmapSimd</*has_repeated_parent=*/true>;
+    fn(def_levels, num_def_levels, level_info, output);
+#else
+    // This indicates we are likely on a big-endian platformat which don't 
have a

Review comment:
       add a specific case for little endian.
   
   I added a comment below, but when there is no repeated parent, all platfoms 
should have good SIMD options for converting to bitmap.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -108,22 +114,245 @@ TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) {
 
 TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) {
   std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
-  int64_t null_count = 5;
-  int64_t values_read = 1;
 
+  ValidityBitmapInputOutput io;
+  io.values_read_upper_bound = 64;
+  io.values_read = 1;
+  io.null_count = 5;
+  io.valid_bits = validity_bitmap.data();
+  io.valid_bits_offset = 1;
+
+  LevelInfo level_info;
+  level_info.repeated_ancestor_def_level = 1;
+  level_info.def_level = 2;
+  level_info.rep_level = 1;
   // All zeros should be ignored, ones should be unset in the bitmp and 2 
should be set.
   std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2};
-  DefinitionLevelsToBitmap(
-      def_levels.data(), def_levels.size(), /*max_definition_level=*/2,
-      /*max_repetition_level=*/1, &values_read, &null_count, 
validity_bitmap.data(),
-      /*valid_bits_offset=*/1);
+  DefinitionLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, 
&io);
 
   EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000");
   for (size_t x = 1; x < validity_bitmap.size(); x++) {
     EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x;
   }
-  EXPECT_EQ(null_count, /*5 + 1 =*/6);
-  EXPECT_EQ(values_read, 4);  // value should get overwritten.
+  EXPECT_EQ(io.null_count, /*5 + 1 =*/6);
+  EXPECT_EQ(io.values_read, 4);  // value should get overwritten.
+}
+
+class MultiLevelTestData {
+ public:
+  // Triply nested list values borrow from write_path
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  std::vector<int16_t> def_levels_{2, 7, 6, 7, 5, 3,  // first row
+                                   5, 5, 7, 7, 2, 7,  // second row
+                                   0,                 // third row
+                                   1};
+  std::vector<int16_t> rep_levels_{0, 1, 3, 3, 2, 1,  // first row
+                                   0, 1, 2, 3, 1, 1,  // second row
+                                   0, 0};
+};
+
+template <typename ConverterType>
+class NestedListTest : public testing::Test {
+ public:
+  MultiLevelTestData test_data_;
+  ConverterType converter_;
+};
+
+template <typename ListType>
+struct RepDefLevelConverter {
+  using ListLengthType = ListType;
+  ListLengthType* ComputeListInfo(const MultiLevelTestData& test_data,
+                                  LevelInfo level_info, 
ValidityBitmapInputOutput* output,
+                                  ListType* lengths) {
+    ConvertDefRepLevelsToList(test_data.def_levels_.data(), 
test_data.rep_levels_.data(),
+                              test_data.def_levels_.size(), level_info, 
output, lengths);
+    return lengths + output->values_read;
+  }
+};
+
+using ConverterTypes =
+    ::testing::Types<RepDefLevelConverter</*list_length_type=*/int32_t>,
+                     RepDefLevelConverter</*list_length_type=*/int64_t>>;
+TYPED_TEST_CASE(NestedListTest, ConverterTypes);
+
+TYPED_TEST(NestedListTest, OuterMostTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 outer most lists (len(3), len(4), null, len(0))
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(5, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 4;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = 
this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 4);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 7, 7, 7));
+
+  EXPECT_EQ(validity_io.values_read, 4);
+  EXPECT_EQ(validity_io.null_count, 1);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/4), "1101");
+}
+
+TYPED_TEST(NestedListTest, MiddleListTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> middle lists (null, len(2), len(0),
+  //                  len(1), len(2), null, len(1),
+  //                  N/A,
+  //                  N/A
+  LevelInfo level_info;
+  level_info.rep_level = 2;
+  level_info.def_level = 4;
+  level_info.repeated_ancestor_def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(8, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 7;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = 
this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 7);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 0, 2, 2, 3, 5, 5, 6));
+
+  EXPECT_EQ(validity_io.values_read, 7);
+  EXPECT_EQ(validity_io.null_count, 2);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/7), "0111101");
+}
+
+TYPED_TEST(NestedListTest, InnerMostListTest) {
+  // [null, [[1, null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 inner lists (N/A, [len(3), len(0)], N/A
+  //                        len(0), [len(0), len(2)], N/A, len(1),
+  //                        N/A,
+  //                        N/A
+  LevelInfo level_info;
+  level_info.rep_level = 3;
+  level_info.def_level = 6;
+  level_info.repeated_ancestor_def_level = 4;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(7, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 6;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = 
this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 6);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 3, 3, 3, 5, 6));
+
+  EXPECT_EQ(validity_io.values_read, 6);
+  EXPECT_EQ(validity_io.null_count, 0);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/6), "111111");
+}
+
+TYPED_TEST(NestedListTest, SimpleLongList) {
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+  level_info.repeated_ancestor_def_level = 0;
+
+  // No empty lists.
+  this->test_data_.def_levels_ = std::vector<int16_t>(65 * 9, 2);
+  this->test_data_.rep_levels_.clear();
+  for (int x = 0; x < 65; x++) {
+    this->test_data_.rep_levels_.push_back(0);
+    this->test_data_.rep_levels_.insert(this->test_data_.rep_levels_.end(), 8,
+                                        /*rep_level=*/1);
+  }
+
+  std::vector<typename TypeParam::ListLengthType> lengths(66, 0);
+  std::vector<typename TypeParam::ListLengthType> expected_lengths(66, 0);
+  for (size_t x = 1; x < expected_lengths.size(); x++) {
+    expected_lengths[x] = x * 9;
+  }
+  std::vector<uint8_t> validity_output(9, 0);
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 65;
+  validity_io.valid_bits = validity_output.data();
+  typename TypeParam::ListLengthType* next_position = 
this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 65);
+  EXPECT_THAT(lengths, testing::ElementsAreArray(expected_lengths));
+
+  EXPECT_EQ(validity_io.values_read, 65);
+  EXPECT_EQ(validity_io.null_count, 0);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/65),
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "1");
+}
+
+TYPED_TEST(NestedListTest, TestOverflow) {
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+  level_info.repeated_ancestor_def_level = 0;
+
+  // No empty lists.
+  this->test_data_.def_levels_ = std::vector<int16_t>{2};
+  this->test_data_.rep_levels_ = std::vector<int16_t>{0};
+
+  std::vector<typename TypeParam::ListLengthType> lengths(
+      2, std::numeric_limits<typename TypeParam::ListLengthType>::max());

Review comment:
       done.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());

Review comment:
       done.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,

Review comment:
       yep.  done.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,

Review comment:
       changed to resize.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),

Review comment:
       yes, I thought this was causing problems with type inference at some 
pont.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");
   }
 
-  // We have at least one child
-  const int16_t* child_def_levels;
-  int64_t child_length = 0;
-  bool found_nullable_child = false;
-  int16_t* result_levels = nullptr;
-
-  int child_index = 0;
-  while (child_index < static_cast<int>(children_.size())) {
-    if (!children_[child_index]->field()->nullable()) {
-      ++child_index;
-      continue;
-    }
-    RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, 
&child_length));
-    auto size = child_length * sizeof(int16_t);
-    ARROW_ASSIGN_OR_RAISE(def_levels_buffer_, AllocateResizableBuffer(size, 
ctx_->pool));
-    // Initialize with the minimal def level
-    std::memset(def_levels_buffer_->mutable_data(), -1, size);
-    result_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
-    found_nullable_child = true;
-    break;
-  }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has a repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
 
-  if (!found_nullable_child) {
+  if (*data == nullptr) {
+    // Only happens if there are actually 0 rows available.
     *data = nullptr;

Review comment:
       Yes I think it is, removed.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");
   }
 
-  // We have at least one child
-  const int16_t* child_def_levels;
-  int64_t child_length = 0;
-  bool found_nullable_child = false;
-  int16_t* result_levels = nullptr;
-
-  int child_index = 0;
-  while (child_index < static_cast<int>(children_.size())) {
-    if (!children_[child_index]->field()->nullable()) {
-      ++child_index;
-      continue;
-    }
-    RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, 
&child_length));
-    auto size = child_length * sizeof(int16_t);
-    ARROW_ASSIGN_OR_RAISE(def_levels_buffer_, AllocateResizableBuffer(size, 
ctx_->pool));
-    // Initialize with the minimal def level
-    std::memset(def_levels_buffer_->mutable_data(), -1, size);
-    result_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
-    found_nullable_child = true;
-    break;
-  }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has a repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
 
-  if (!found_nullable_child) {
+  if (*data == nullptr) {
+    // Only happens if there are actually 0 rows available.
     *data = nullptr;
     *length = 0;
-    return Status::OK();
   }
+  return Status::OK();
+}
 
-  // Look at the rest of the children
-
-  // When a struct is defined, all of its children def levels are at least at
-  // nesting level, and def level equals nesting level.
-  // When a struct is not defined, all of its children def levels are less than
-  // the nesting level, and the def level equals max(children def levels)
-  // All other possibilities are malformed definition data.
-  for (; child_index < static_cast<int>(children_.size()); ++child_index) {
-    // Child is non-nullable, and therefore has no definition levels
-    if (!children_[child_index]->field()->nullable()) {
-      continue;
-    }
+Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
+  *data = nullptr;
+  if (children_.size() == 0) {
+    *length = 0;
+    return Status::Invalid("StructReader had no childre");
+  }
 
-    auto& child = children_[child_index];
-    int64_t current_child_length;
-    RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, 
&current_child_length));
-
-    if (child_length != current_child_length) {
-      std::stringstream ss;
-      ss << "Parquet struct decoding error. Expected to decode " << 
child_length
-         << " definition levels"
-         << " from child field \"" << child->field()->ToString() << "\" in 
parent \""
-         << this->field()->ToString() << "\" but was only able to decode "
-         << current_child_length;
-      return Status::IOError(ss.str());
-    }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
 
-    DCHECK_EQ(child_length, current_child_length);
-    for (int64_t i = 0; i < child_length; i++) {
-      // Check that value is either uninitialized, or current
-      // and previous children def levels agree on the struct level
-      DCHECK((result_levels[i] == -1) || ((result_levels[i] >= 
struct_def_level_) ==
-                                          (child_def_levels[i] >= 
struct_def_level_)));
-      result_levels[i] =
-          std::max(result_levels[i], std::min(child_def_levels[i], 
struct_def_level_));
-    }
+  if (data == nullptr) {
+    // Only happens if there are actually 0 rows available.
+    *data = nullptr;

Review comment:
       yeah, removed.  we shouldn't need this.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.

Review comment:
       rephrased a little bit.  There is always an equal number of repetition 
and definition levels for any particular leaf.    

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");

Review comment:
       done.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");
   }
 
-  // We have at least one child
-  const int16_t* child_def_levels;
-  int64_t child_length = 0;
-  bool found_nullable_child = false;
-  int16_t* result_levels = nullptr;
-
-  int child_index = 0;
-  while (child_index < static_cast<int>(children_.size())) {
-    if (!children_[child_index]->field()->nullable()) {
-      ++child_index;
-      continue;
-    }
-    RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, 
&child_length));
-    auto size = child_length * sizeof(int16_t);
-    ARROW_ASSIGN_OR_RAISE(def_levels_buffer_, AllocateResizableBuffer(size, 
ctx_->pool));
-    // Initialize with the minimal def level
-    std::memset(def_levels_buffer_->mutable_data(), -1, size);
-    result_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
-    found_nullable_child = true;
-    break;
-  }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has a repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
 
-  if (!found_nullable_child) {
+  if (*data == nullptr) {
+    // Only happens if there are actually 0 rows available.
     *data = nullptr;
     *length = 0;
-    return Status::OK();
   }
+  return Status::OK();
+}
 
-  // Look at the rest of the children
-
-  // When a struct is defined, all of its children def levels are at least at
-  // nesting level, and def level equals nesting level.
-  // When a struct is not defined, all of its children def levels are less than
-  // the nesting level, and the def level equals max(children def levels)
-  // All other possibilities are malformed definition data.
-  for (; child_index < static_cast<int>(children_.size()); ++child_index) {
-    // Child is non-nullable, and therefore has no definition levels
-    if (!children_[child_index]->field()->nullable()) {
-      continue;
-    }
+Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
+  *data = nullptr;
+  if (children_.size() == 0) {
+    *length = 0;
+    return Status::Invalid("StructReader had no childre");
+  }
 
-    auto& child = children_[child_index];
-    int64_t current_child_length;
-    RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, 
&current_child_length));
-
-    if (child_length != current_child_length) {
-      std::stringstream ss;
-      ss << "Parquet struct decoding error. Expected to decode " << 
child_length
-         << " definition levels"
-         << " from child field \"" << child->field()->ToString() << "\" in 
parent \""
-         << this->field()->ToString() << "\" but was only able to decode "
-         << current_child_length;
-      return Status::IOError(ss.str());
-    }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
 
-    DCHECK_EQ(child_length, current_child_length);
-    for (int64_t i = 0; i < child_length; i++) {
-      // Check that value is either uninitialized, or current
-      // and previous children def levels agree on the struct level
-      DCHECK((result_levels[i] == -1) || ((result_levels[i] >= 
struct_def_level_) ==
-                                          (child_def_levels[i] >= 
struct_def_level_)));
-      result_levels[i] =
-          std::max(result_levels[i], std::min(child_def_levels[i], 
struct_def_level_));
-    }
+  if (data == nullptr) {
+    // Only happens if there are actually 0 rows available.
+    *data = nullptr;
+    *length = 0;
   }
-  *data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
-  *length = static_cast<int64_t>(child_length);
   return Status::OK();
 }
 
-Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
-  return Status::NotImplemented("GetRepLevels is not implemented for struct");
-}
-
-Status StructReader::NextBatch(int64_t records_to_read,
-                               std::shared_ptr<ChunkedArray>* out) {
-  std::vector<std::shared_ptr<Array>> children_arrays;
+Status StructReader::BuildArray(int64_t records_to_read,
+                                std::shared_ptr<ChunkedArray>* out) {
+  std::vector<std::shared_ptr<ArrayData>> children_array_data;
   std::shared_ptr<Buffer> null_bitmap;
-  int64_t null_count;
 
+  ::parquet::internal::ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = records_to_read;
+  validity_io.values_read = records_to_read;
+
+  BEGIN_PARQUET_CATCH_EXCEPTIONS
+  const int16_t* def_levels;
+  const int16_t* rep_levels;
+  int64_t num_levels;
+
+  if (has_repeated_child_) {
+    ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(records_to_read, 
ctx_->pool));
+    validity_io.valid_bits = null_bitmap->mutable_data();
+    RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
+    RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels));
+    DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, 
&validity_io);
+  } else if (filtered_field_->nullable()) {
+    ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(records_to_read, 
ctx_->pool));
+    validity_io.valid_bits = null_bitmap->mutable_data();
+    RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
+    DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io);
+  }
+
+  // Ensure all values are initialized.

Review comment:
       I didn't think about using Resizable buffers.  changed all of the places 
that were filled to it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to