westonpace commented on a change in pull request #11351:
URL: https://github.com/apache/arrow/pull/11351#discussion_r724473946



##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -842,36 +842,79 @@ Status GetReader(const SchemaField& field, const 
std::shared_ptr<Field>& arrow_f
       *out = nullptr;
       return Status::OK();
     }
-    if (type_id == ::arrow::Type::LIST ||
-        type_id == ::arrow::Type::MAP) {  // Map can be reconstructed as list 
of structs.
-      if (type_id == ::arrow::Type::MAP &&
-          child_reader->field()->type()->num_fields() != 2) {
-        // This case applies if either key or value is filtered.
+
+    // These two types might not be equal if there column pruning occurred.
+    // further down the stack.
+    const std::shared_ptr<DataType> reader_child_type = 
child_reader->field()->type();
+    const DataType& schema_child_type = 
*(list_field->type()->field(0)->type());
+    if (type_id == ::arrow::Type::MAP) {
+      if (reader_child_type->num_fields() != 2 ||
+          !reader_child_type->field(0)->type()->Equals(
+              *schema_child_type.field(0)->type())) {
+        // This case applies if either key or value are completed filtered
+        // out so we can take the type as is or the key was partially
+        // so keeping it as a map no longer makes sence.
         list_field = 
list_field->WithType(::arrow::list(child_reader->field()));
+      } else if (!reader_child_type->field(1)->type()->Equals(
+                     *schema_child_type.field(1)->type())) {
+        list_field = list_field->WithType(std::make_shared<::arrow::MapType>(
+            reader_child_type->field(
+                0),  // field 0 is unchanged baed on previous if statement
+            reader_child_type->field(1)));
+      }
+      // Map types are list<struct<key, value>> so use ListReader
+      // for reconstruction.
+      out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info,
+                                         std::move(child_reader)));
+    } else if (type_id == ::arrow::Type::LIST) {
+      if (!reader_child_type->Equals(schema_child_type)) {
+        list_field = list_field->WithType(::arrow::list(reader_child_type));
       }
+
       out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info,
                                          std::move(child_reader)));
     } else if (type_id == ::arrow::Type::LARGE_LIST) {
+      if (!reader_child_type->Equals(schema_child_type)) {
+        list_field = 
list_field->WithType(::arrow::large_list(reader_child_type));
+      }
+
       out->reset(new ListReader<int64_t>(ctx, list_field, field.level_info,
                                          std::move(child_reader)));
-
     } else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) {
+      if (!reader_child_type->Equals(schema_child_type)) {
+        auto& fixed_list_type =
+            checked_cast<const 
::arrow::FixedSizeListType&>(*list_field->type());
+        int32_t list_size = fixed_list_type.list_size();
+        list_field =
+            list_field->WithType(::arrow::fixed_size_list(reader_child_type, 
list_size));
+      }
+
       out->reset(new FixedSizeListReader(ctx, list_field, field.level_info,
                                          std::move(child_reader)));
     } else {
       return Status::UnknownError("Unknown list type: ", 
field.field->ToString());
     }
   } else if (type_id == ::arrow::Type::STRUCT) {
     std::vector<std::shared_ptr<Field>> child_fields;
+    int arrow_field_idx = 0;
     std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
     for (const auto& child : field.children) {
       std::unique_ptr<ColumnReaderImpl> child_reader;
       RETURN_NOT_OK(GetReader(child, ctx, &child_reader));
       if (!child_reader) {
+        arrow_field_idx++;
         // If all children were pruned, then we do not try to read this field
         continue;
       }
-      child_fields.push_back(child.field);
+      std::shared_ptr<::arrow::Field> child_field = child.field;
+      const DataType& reader_child_type = *child_reader->field()->type();
+      const DataType& schema_child_type =
+          *arrow_field->type()->field(arrow_field_idx++)->type();
+      // These might not be equal if column pruning occurred.
+      if (!schema_child_type.Equals(reader_child_type)) {
+        child_field = child_field->WithType(child_reader->field()->type());
+      }
+      child_fields.push_back(child_field);

Review comment:
       I think this assumes that `arrow_field->type()` and `field.children()` 
have the same order which is probably a perfectly reasonable assumption to make 
but I figured I'd check just in case.  E.g. borrowing the test case  from 
arrow_reader_writer_test.cc:4294 would it be legal to do...
   
   ```
     cases.push_back(
         {map_type,
          /*indices=*/{1, 0},
          /*selected_type=*/
          ::arrow::list(field("col", struct_({REVERSE(map_type->key_field())}), 
/*nullable=*/false)),
          kWriteData, /*expected_data=*/R"([[{"key": {"b": 0, "a":1}}]])"});
   ```
   
   A selection string "x.y.z" should just be masking (and thus can't change the 
order) but I wasn't sure if there was some other code path that could lead here.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -842,36 +842,79 @@ Status GetReader(const SchemaField& field, const 
std::shared_ptr<Field>& arrow_f
       *out = nullptr;
       return Status::OK();
     }
-    if (type_id == ::arrow::Type::LIST ||
-        type_id == ::arrow::Type::MAP) {  // Map can be reconstructed as list 
of structs.
-      if (type_id == ::arrow::Type::MAP &&
-          child_reader->field()->type()->num_fields() != 2) {
-        // This case applies if either key or value is filtered.
+
+    // These two types might not be equal if there column pruning occurred.
+    // further down the stack.

Review comment:
       ```suggestion
       // These two types might not be equal if column pruning occurred.
       // further down the stack.
   ```

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -842,36 +842,79 @@ Status GetReader(const SchemaField& field, const 
std::shared_ptr<Field>& arrow_f
       *out = nullptr;
       return Status::OK();
     }
-    if (type_id == ::arrow::Type::LIST ||
-        type_id == ::arrow::Type::MAP) {  // Map can be reconstructed as list 
of structs.
-      if (type_id == ::arrow::Type::MAP &&
-          child_reader->field()->type()->num_fields() != 2) {
-        // This case applies if either key or value is filtered.
+
+    // These two types might not be equal if there column pruning occurred.
+    // further down the stack.
+    const std::shared_ptr<DataType> reader_child_type = 
child_reader->field()->type();
+    const DataType& schema_child_type = 
*(list_field->type()->field(0)->type());

Review comment:
       Is there any pathological case where `list_field->type()` won't have any 
child fields? (and thus `field->(0)` fails).  Maybe 
`DCHECK_GT(list_field->type->num_fields(), 0)`

##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4191,5 +4191,148 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
 }
 #endif
 
+struct NestedFilterTestCase {
+  std::shared_ptr<::arrow::DataType> write_schema;
+  std::vector<int> indices_to_read;
+  std::shared_ptr<::arrow::DataType> expected_schema;
+  std::string write_data;
+  std::string read_data;
+};
+class TestNestedSchemaFilteredReader
+    : public ::testing::TestWithParam<NestedFilterTestCase> {};
+
+TEST_P(TestNestedSchemaFilteredReader, ReadWrite) {
+  std::shared_ptr<::arrow::io::BufferOutputStream> sink = CreateOutputStream();
+  auto write_props = WriterProperties::Builder().build();
+  std::shared_ptr<::arrow::Array> array =
+      ArrayFromJSON(GetParam().write_schema, GetParam().write_data);
+
+  ASSERT_OK_NO_THROW(
+      WriteTable(**Table::FromRecordBatches({::arrow::RecordBatch::Make(
+                     ::arrow::schema({::arrow::field("col", array->type())}),
+                     array->length(), {array})}),
+                 ::arrow::default_memory_pool(), sink, /*row_group_size=*/100,

Review comment:
       ```suggestion
                    ::arrow::default_memory_pool(), sink, /*chunk_size=*/100,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to