pitrou commented on a change in pull request #8177:
URL: https://github.com/apache/arrow/pull/8177#discussion_r488817917



##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),

Review comment:
       Why `2`?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,

Review comment:
       "offsets" rather than "lengths"

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());

Review comment:
       "offset_data" or "offsets_data"

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),

Review comment:
       Can just write `nullptr`, I think.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());

Review comment:
       (the tests should use ValidateFull, anyway)

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,

Review comment:
       Why do you need to write some values past the offsets end?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)

Review comment:
       I'm not sure what this means, shouldn't you know up front the number of 
values? Do you mean the file was truncated before the row group end (is that 
supported)?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.

Review comment:
       You mean "of rep levels"? You could have arbitrarily nested structs with 
a lot of dep levels?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());

Review comment:
       I'm not sure we want to validate here. Let the user (and the tests) do 
it?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");

Review comment:
       "children"

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");
   }
 
-  // We have at least one child
-  const int16_t* child_def_levels;
-  int64_t child_length = 0;
-  bool found_nullable_child = false;
-  int16_t* result_levels = nullptr;
-
-  int child_index = 0;
-  while (child_index < static_cast<int>(children_.size())) {
-    if (!children_[child_index]->field()->nullable()) {
-      ++child_index;
-      continue;
-    }
-    RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, 
&child_length));
-    auto size = child_length * sizeof(int16_t);
-    ARROW_ASSIGN_OR_RAISE(def_levels_buffer_, AllocateResizableBuffer(size, 
ctx_->pool));
-    // Initialize with the minimal def level
-    std::memset(def_levels_buffer_->mutable_data(), -1, size);
-    result_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
-    found_nullable_child = true;
-    break;
-  }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has a repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
 
-  if (!found_nullable_child) {
+  if (*data == nullptr) {
+    // Only happens if there are actually 0 rows available.
     *data = nullptr;
     *length = 0;
-    return Status::OK();
   }
+  return Status::OK();
+}
 
-  // Look at the rest of the children
-
-  // When a struct is defined, all of its children def levels are at least at
-  // nesting level, and def level equals nesting level.
-  // When a struct is not defined, all of its children def levels are less than
-  // the nesting level, and the def level equals max(children def levels)
-  // All other possibilities are malformed definition data.
-  for (; child_index < static_cast<int>(children_.size()); ++child_index) {
-    // Child is non-nullable, and therefore has no definition levels
-    if (!children_[child_index]->field()->nullable()) {
-      continue;
-    }
+Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
+  *data = nullptr;
+  if (children_.size() == 0) {
+    *length = 0;
+    return Status::Invalid("StructReader had no childre");
+  }
 
-    auto& child = children_[child_index];
-    int64_t current_child_length;
-    RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, 
&current_child_length));
-
-    if (child_length != current_child_length) {
-      std::stringstream ss;
-      ss << "Parquet struct decoding error. Expected to decode " << 
child_length
-         << " definition levels"
-         << " from child field \"" << child->field()->ToString() << "\" in 
parent \""
-         << this->field()->ToString() << "\" but was only able to decode "
-         << current_child_length;
-      return Status::IOError(ss.str());
-    }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
 
-    DCHECK_EQ(child_length, current_child_length);
-    for (int64_t i = 0; i < child_length; i++) {
-      // Check that value is either uninitialized, or current
-      // and previous children def levels agree on the struct level
-      DCHECK((result_levels[i] == -1) || ((result_levels[i] >= 
struct_def_level_) ==
-                                          (child_def_levels[i] >= 
struct_def_level_)));
-      result_levels[i] =
-          std::max(result_levels[i], std::min(child_def_levels[i], 
struct_def_level_));
-    }
+  if (data == nullptr) {
+    // Only happens if there are actually 0 rows available.
+    *data = nullptr;

Review comment:
       You're dereferencing a null pointer (see `if` condition above).

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)

Review comment:
       Or is `number_of_slots` just an upper bound?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");
   }
 
-  // We have at least one child
-  const int16_t* child_def_levels;
-  int64_t child_length = 0;
-  bool found_nullable_child = false;
-  int16_t* result_levels = nullptr;
-
-  int child_index = 0;
-  while (child_index < static_cast<int>(children_.size())) {
-    if (!children_[child_index]->field()->nullable()) {
-      ++child_index;
-      continue;
-    }
-    RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, 
&child_length));
-    auto size = child_length * sizeof(int16_t);
-    ARROW_ASSIGN_OR_RAISE(def_levels_buffer_, AllocateResizableBuffer(size, 
ctx_->pool));
-    // Initialize with the minimal def level
-    std::memset(def_levels_buffer_->mutable_data(), -1, size);
-    result_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
-    found_nullable_child = true;
-    break;
-  }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has a repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
 
-  if (!found_nullable_child) {
+  if (*data == nullptr) {
+    // Only happens if there are actually 0 rows available.
     *data = nullptr;
     *length = 0;
-    return Status::OK();
   }
+  return Status::OK();
+}
 
-  // Look at the rest of the children
-
-  // When a struct is defined, all of its children def levels are at least at
-  // nesting level, and def level equals nesting level.
-  // When a struct is not defined, all of its children def levels are less than
-  // the nesting level, and the def level equals max(children def levels)
-  // All other possibilities are malformed definition data.
-  for (; child_index < static_cast<int>(children_.size()); ++child_index) {
-    // Child is non-nullable, and therefore has no definition levels
-    if (!children_[child_index]->field()->nullable()) {
-      continue;
-    }
+Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
+  *data = nullptr;
+  if (children_.size() == 0) {
+    *length = 0;
+    return Status::Invalid("StructReader had no childre");
+  }
 
-    auto& child = children_[child_index];
-    int64_t current_child_length;
-    RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, 
&current_child_length));
-
-    if (child_length != current_child_length) {
-      std::stringstream ss;
-      ss << "Parquet struct decoding error. Expected to decode " << 
child_length
-         << " definition levels"
-         << " from child field \"" << child->field()->ToString() << "\" in 
parent \""
-         << this->field()->ToString() << "\" but was only able to decode "
-         << current_child_length;
-      return Status::IOError(ss.str());
-    }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetRepLevels(data, length));
 
-    DCHECK_EQ(child_length, current_child_length);
-    for (int64_t i = 0; i < child_length; i++) {
-      // Check that value is either uninitialized, or current
-      // and previous children def levels agree on the struct level
-      DCHECK((result_levels[i] == -1) || ((result_levels[i] >= 
struct_def_level_) ==
-                                          (child_def_levels[i] >= 
struct_def_level_)));
-      result_levels[i] =
-          std::max(result_levels[i], std::min(child_def_levels[i], 
struct_def_level_));
-    }
+  if (data == nullptr) {
+    // Only happens if there are actually 0 rows available.
+    *data = nullptr;
+    *length = 0;
   }
-  *data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
-  *length = static_cast<int64_t>(child_length);
   return Status::OK();
 }
 
-Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
-  return Status::NotImplemented("GetRepLevels is not implemented for struct");
-}
-
-Status StructReader::NextBatch(int64_t records_to_read,
-                               std::shared_ptr<ChunkedArray>* out) {
-  std::vector<std::shared_ptr<Array>> children_arrays;
+Status StructReader::BuildArray(int64_t records_to_read,
+                                std::shared_ptr<ChunkedArray>* out) {
+  std::vector<std::shared_ptr<ArrayData>> children_array_data;
   std::shared_ptr<Buffer> null_bitmap;
-  int64_t null_count;
 
+  ::parquet::internal::ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = records_to_read;
+  validity_io.values_read = records_to_read;
+
+  BEGIN_PARQUET_CATCH_EXCEPTIONS
+  const int16_t* def_levels;
+  const int16_t* rep_levels;
+  int64_t num_levels;
+
+  if (has_repeated_child_) {
+    ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(records_to_read, 
ctx_->pool));
+    validity_io.valid_bits = null_bitmap->mutable_data();
+    RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
+    RETURN_NOT_OK(GetRepLevels(&rep_levels, &num_levels));
+    DefRepLevelsToBitmap(def_levels, rep_levels, num_levels, level_info_, 
&validity_io);
+  } else if (filtered_field_->nullable()) {
+    ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(records_to_read, 
ctx_->pool));
+    validity_io.valid_bits = null_bitmap->mutable_data();
+    RETURN_NOT_OK(GetDefLevels(&def_levels, &num_levels));
+    DefLevelsToBitmap(def_levels, num_levels, level_info_, &validity_io);
+  }
+
+  // Ensure all values are initialized.

Review comment:
       Why? Shouldn't you resize the buffer instead?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -453,229 +483,226 @@ class NestedListReader : public ColumnReaderImpl {
     return item_reader_->GetRepLevels(data, length);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override {
-    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
-      return Status::Invalid("Mix of struct and list types not yet supported");
-    }
-
-    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+  bool IsOrHasRepeatedChild() const final { return true; }
 
-    std::shared_ptr<Array> item_chunk;
-    switch ((*out)->num_chunks()) {
-      case 0: {
-        ARROW_ASSIGN_OR_RAISE(item_chunk, 
::arrow::MakeArrayOfNull((*out)->type(), 0));
-        break;
-      }
-      case 1:
-        item_chunk = (*out)->chunk(0);
-        break;
-      default:
-        // ARROW-3762(wesm): If item reader yields a chunked array, we reject 
as
-        // this is not yet implemented
-        return Status::NotImplemented(
-            "Nested data conversions not implemented for chunked array 
outputs");
-    }
+  Status LoadBatch(int64_t number_of_records) final {
+    return item_reader_->LoadBatch(number_of_records);
+  }
 
+  Status BuildArray(int64_t number_of_slots,
+                    std::shared_ptr<ChunkedArray>* out) override {
     const int16_t* def_levels;
     const int16_t* rep_levels;
     int64_t num_levels;
     RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
     RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
-    std::shared_ptr<Array> result;
-    RETURN_NOT_OK(ReconstructNestedList(item_chunk, field_, 
max_definition_level_,
-                                        max_repetition_level_, def_levels, 
rep_levels,
-                                        num_levels, ctx_->pool, &result));
+    std::shared_ptr<Buffer> validity_buffer;
+    ::parquet::internal::ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = number_of_slots;
+    if (field_->nullable()) {
+      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(number_of_slots, 
ctx_->pool));
+
+      validity_io.valid_bits = validity_buffer->mutable_data();
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> lengths_buffer,
+        AllocateBuffer(sizeof(IndexType) * std::max(int64_t{2}, 
number_of_slots + 1),
+                       ctx_->pool));
+    // ensure zero initialization in case we have reached a zero length list 
(and
+    // because first entry is always zero).
+    IndexType* length_data = 
reinterpret_cast<IndexType*>(lengths_buffer->mutable_data());
+    std::fill(length_data, length_data + 2, 0);
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    ::parquet::internal::DefRepLevelsToList(def_levels, rep_levels, num_levels,
+                                            level_info_, &validity_io, 
length_data);
+    END_PARQUET_CATCH_EXCEPTIONS
+    // We might return less then the requested slot (i.e. reaching an end of a 
file)
+    // ensure we've set all the bits here.
+    if (validity_io.values_read < number_of_slots) {
+      // + 1  because arrays lengths are values + 1
+      std::fill(length_data + validity_io.values_read + 1,
+                length_data + number_of_slots + 1, 0);
+      if (validity_io.valid_bits != nullptr &&
+          BitUtil::BytesForBits(validity_io.values_read) <
+              BitUtil::BytesForBits(number_of_slots)) {
+        std::fill(validity_io.valid_bits + 
BitUtil::BytesForBits(validity_io.values_read),
+                  validity_io.valid_bits + 
BitUtil::BytesForBits(number_of_slots), 0);
+      }
+    }
+
+    
RETURN_NOT_OK(item_reader_->BuildArray(length_data[validity_io.values_read], 
out));
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> item_chunk, 
ChunksToSingle(**out));
+
+    std::vector<std::shared_ptr<Buffer>> buffers{
+        validity_io.null_count > 0 ? validity_buffer : 
std::shared_ptr<Buffer>(),
+        lengths_buffer};
+    auto data = std::make_shared<ArrayData>(
+        field_->type(),
+        /*length=*/validity_io.values_read, std::move(buffers),
+        std::vector<std::shared_ptr<ArrayData>>{item_chunk}, 
validity_io.null_count);
+
+    std::shared_ptr<Array> result = ::arrow::MakeArray(data);
+    RETURN_NOT_OK(result->Validate());
     *out = std::make_shared<ChunkedArray>(result);
     return Status::OK();
   }
 
   const std::shared_ptr<Field> field() override { return field_; }
 
-  const ColumnDescriptor* descr() const override { return nullptr; }
-
-  ReaderType type() const override { return LIST; }
-
  private:
   std::shared_ptr<ReaderContext> ctx_;
   std::shared_ptr<Field> field_;
-  int16_t max_definition_level_;
-  int16_t max_repetition_level_;
+  ::parquet::internal::LevelInfo level_info_;
   std::unique_ptr<ColumnReaderImpl> item_reader_;
 };
 
 class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
   explicit StructReader(std::shared_ptr<ReaderContext> ctx,
-                        const SchemaField& schema_field,
                         std::shared_ptr<Field> filtered_field,
-                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& 
children)
+                        ::parquet::internal::LevelInfo level_info,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>> 
children)
       : ctx_(std::move(ctx)),
-        schema_field_(schema_field),
         filtered_field_(std::move(filtered_field)),
-        struct_def_level_(schema_field.level_info.def_level),
-        children_(std::move(children)) {}
+        level_info_(level_info),
+        children_(std::move(children)) {
+    // There could be a mix of children some might be repeated some might not 
be.
+    // If possible use one that isn't since that will be guaranteed to have 
the least
+    // number of rep/def levels.
+    auto result = std::find_if(children_.begin(), children_.end(),
+                               [](const std::unique_ptr<ColumnReaderImpl>& 
child) {
+                                 return !child->IsOrHasRepeatedChild();
+                               });
+    if (result != children_.end()) {
+      def_rep_level_child_ = result->get();
+      has_repeated_child_ = false;
+    } else if (!children_.empty()) {
+      def_rep_level_child_ = children_.front().get();
+      has_repeated_child_ = true;
+    }
+  }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
+  bool IsOrHasRepeatedChild() const final { return has_repeated_child_; }
+
+  Status LoadBatch(int64_t records_to_read) override {
+    for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
+      RETURN_NOT_OK(reader->LoadBatch(records_to_read));
+    }
+    return Status::OK();
+  }
+  Status BuildArray(int64_t records_to_read, std::shared_ptr<ChunkedArray>* 
out) override;
   Status GetDefLevels(const int16_t** data, int64_t* length) override;
   Status GetRepLevels(const int16_t** data, int64_t* length) override;
   const std::shared_ptr<Field> field() override { return filtered_field_; }
-  const ColumnDescriptor* descr() const override { return nullptr; }
-  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::shared_ptr<ReaderContext> ctx_;
-  SchemaField schema_field_;
-  std::shared_ptr<Field> filtered_field_;
-  int16_t struct_def_level_;
-  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* 
null_count);
+  const std::shared_ptr<ReaderContext> ctx_;
+  const std::shared_ptr<Field> filtered_field_;
+  const ::parquet::internal::LevelInfo level_info_;
+  const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
+  ColumnReaderImpl* def_rep_level_child_ = nullptr;
+  bool has_repeated_child_;
 };
 
-Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* 
null_bitmap_out,
-                                          int64_t* null_count_out) {
-  auto null_count = 0;
-  const int16_t* def_levels_data;
-  int64_t def_levels_length;
-  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
-                        AllocateEmptyBitmap(def_levels_length, ctx_->pool));
-  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (int64_t i = 0; i < def_levels_length; i++) {
-    if (def_levels_data[i] < struct_def_level_) {
-      // Mark null
-      null_count += 1;
-    } else {
-      DCHECK_EQ(def_levels_data[i], struct_def_level_);
-      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
-    }
-  }
-
-  *null_count_out = null_count;
-  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
-  return Status::OK();
-}
-
-// TODO(itaiin): Consider caching the results of this calculation -
-//   note that this is only used once for each read for now
 Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
-    // Empty struct
     *length = 0;
-    return Status::OK();
+    return Status::Invalid("StructReader had no childre");
   }
 
-  // We have at least one child
-  const int16_t* child_def_levels;
-  int64_t child_length = 0;
-  bool found_nullable_child = false;
-  int16_t* result_levels = nullptr;
-
-  int child_index = 0;
-  while (child_index < static_cast<int>(children_.size())) {
-    if (!children_[child_index]->field()->nullable()) {
-      ++child_index;
-      continue;
-    }
-    RETURN_NOT_OK(children_[child_index]->GetDefLevels(&child_def_levels, 
&child_length));
-    auto size = child_length * sizeof(int16_t);
-    ARROW_ASSIGN_OR_RAISE(def_levels_buffer_, AllocateResizableBuffer(size, 
ctx_->pool));
-    // Initialize with the minimal def level
-    std::memset(def_levels_buffer_->mutable_data(), -1, size);
-    result_levels = 
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
-    found_nullable_child = true;
-    break;
-  }
+  // This method should only be called when this struct or one of its parents
+  // are optional/repeated or it has a repeated child.
+  // Meaning all children must have rep/def levels associated
+  // with them.
+  RETURN_NOT_OK(def_rep_level_child_->GetDefLevels(data, length));
 
-  if (!found_nullable_child) {
+  if (*data == nullptr) {
+    // Only happens if there are actually 0 rows available.
     *data = nullptr;

Review comment:
       Isn't this redundant with your `if` condition above? Also, why does 
`length` need to be filled out explicitly below?
   (doesn't `def_rep_level_child_->GetDefLevels` do it?).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to