westonpace commented on a change in pull request #11704:
URL: https://github.com/apache/arrow/pull/11704#discussion_r749702773
##########
File path: cpp/src/arrow/type.h
##########
@@ -1657,6 +1657,11 @@ class ARROW_EXPORT FieldRef {
const std::string* name() const {
return IsName() ? &util::get<std::string>(impl_) : NULLPTR;
}
+ const std::vector<FieldRef>* nested_refs() const {
+ return util::holds_alternative<std::vector<FieldRef>>(impl_)
+ ? &util::get<std::vector<FieldRef>>(impl_)
+ : NULLPTR;
+ }
Review comment:
Why is the logic here different than the logic above in `IsNested`? I
would expect this would be `return IsNested() ? ...`
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -231,30 +231,105 @@ void AddColumnIndices(const SchemaField& schema_field,
}
}
-// Compute the column projection out of an optional arrow::Schema
-std::vector<int> InferColumnProjection(const parquet::arrow::FileReader&
reader,
- const ScanOptions& options) {
+Status ResolveOneFieldRef(
+ const SchemaManifest& manifest, const FieldRef& field_ref,
+ const std::unordered_map<std::string, const SchemaField*>& field_lookup,
+ const std::unordered_set<std::string>& duplicate_fields,
+ std::vector<int>* columns_selection) {
+ if (const auto* name = field_ref.name()) {
+ auto it = field_lookup.find(*name);
+ if (it != field_lookup.end()) {
+ AddColumnIndices(*it->second, columns_selection);
+ } else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
+ // We shouldn't generally get here because SetProjection will reject
such references
+ return Status::Invalid("Ambiguous reference to column '", *name,
+ "' which occurs more than once");
+ }
+ // "Virtual" column: field is not in file but is in the ScanOptions.
+ // Ignore it here, as projection will pad the batch with a null column.
+ return Status::OK();
+ }
+
+ const SchemaField* field = nullptr;
+ if (const auto* refs = field_ref.nested_refs()) {
+ // Only supports a sequence of names
+ for (const auto& ref : *refs) {
+ if (const auto* name = ref.name()) {
Review comment:
```suggestion
if (const std::string* name = ref.name()) {
```
Of course, by this point, I think I'm unlikely to forget the rule :laughing:
Feel free to ignore these.
##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -534,6 +543,8 @@ class FileFormatFixtureMixin : public ::testing::Test {
std::shared_ptr<ScanOptions> opts_;
};
+MATCHER(PointeesEquals, "") { return
std::get<0>(arg)->Equals(*std::get<1>(arg)); }
Review comment:
This seems general enough to go in a test util file?
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1343,6 +1431,69 @@ DatasetAndBatches MakeBasicDataset() {
return {dataset, batches};
}
+DatasetAndBatches MakeNestedDataset() {
+ const auto dataset_schema = ::arrow::schema({
+ field("a", int32()),
+ field("b", boolean()),
+ field("c", struct_({
+ field("d", int64()),
+ field("e", float64()),
+ })),
+ });
+
+ const auto physical_schema = ::arrow::schema({
+ field("a", int32()),
+ field("b", boolean()),
+ field("c", struct_({
+ field("e", int64()),
+ })),
+ });
+
+ RecordBatchVector record_batches{
+ RecordBatchFromJSON(physical_schema, R"([{"a": 1, "b": null, "c":
{"e": 0}},
+ {"a": 2, "b": true, "c":
{"e": 1}}])"),
+ RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c":
{"e": 2}},
+ {"a": 3, "b": false, "c":
{"e": null}}])"),
+ RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c":
{"e": 4}},
+ {"a": 4, "b": false, "c":
{"e": 5}}])"),
+ RecordBatchFromJSON(physical_schema, R"([{"a": 5, "b": null, "c":
{"e": 6}},
+ {"a": 6, "b": false, "c":
{"e": 7}},
+ {"a": 7, "b": false, "c":
{"e": null}}])"),
+ };
+
+ auto dataset = std::make_shared<FragmentDataset>(
+ dataset_schema,
+ FragmentVector{
+ std::make_shared<InMemoryFragment>(
+ physical_schema, RecordBatchVector{record_batches[0],
record_batches[1]},
+ literal(true)),
+ std::make_shared<InMemoryFragment>(
+ physical_schema, RecordBatchVector{record_batches[2],
record_batches[3]},
+ literal(true)),
+ });
+
+ std::vector<compute::ExecBatch> batches;
+
+ auto batch_it = record_batches.begin();
+ for (int fragment_index = 0; fragment_index < 2; ++fragment_index) {
+ for (int batch_index = 0; batch_index < 2; ++batch_index) {
+ const auto& batch = *batch_it++;
+
+ // the scanned ExecBatches will begin with physical columns
+ batches.emplace_back(*batch);
+
+ // scanned batches will be augmented with fragment and batch indices
+ batches.back().values.emplace_back(fragment_index);
+ batches.back().values.emplace_back(batch_index);
+
+ // ... and with the last-in-fragment flag
+ batches.back().values.emplace_back(batch_index == 1);
+ }
+ }
Review comment:
This logic feels like it belongs in a helper method somewhere. Maybe a
`DatasetAndBatchesFromJSON`
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -231,30 +231,105 @@ void AddColumnIndices(const SchemaField& schema_field,
}
}
-// Compute the column projection out of an optional arrow::Schema
-std::vector<int> InferColumnProjection(const parquet::arrow::FileReader&
reader,
- const ScanOptions& options) {
+Status ResolveOneFieldRef(
+ const SchemaManifest& manifest, const FieldRef& field_ref,
+ const std::unordered_map<std::string, const SchemaField*>& field_lookup,
+ const std::unordered_set<std::string>& duplicate_fields,
+ std::vector<int>* columns_selection) {
+ if (const auto* name = field_ref.name()) {
Review comment:
```suggestion
if (const std::string* name = field_ref.name()) {
```
Same optional nit as above.
##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -631,12 +646,92 @@ class FileFormatScanMixin : public
FileFormatFixtureMixin<FormatHelper>,
for (auto maybe_batch : PhysicalBatches(fragment)) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
row_count += batch->num_rows();
- AssertSchemaEqual(*batch->schema(), *expected_schema,
- /*check_metadata=*/false);
+ ASSERT_THAT(
+ batch->schema()->fields(),
+ ::testing::UnorderedPointwise(PointeesEquals(),
expected_schema->fields()))
+ << "EXPECTED:\n"
+ << expected_schema->ToString() << "\nACTUAL:\n"
+ << batch->schema()->ToString();
}
ASSERT_EQ(row_count, expected_rows());
}
+ void TestScanProjectedNested(bool fine_grained_selection = false) {
+ auto f32 = field("f32", float32());
+ auto f64 = field("f64", float64());
+ auto i32 = field("i32", int32());
+ auto i64 = field("i64", int64());
+ auto struct1 = field("struct1", struct_({f32, i32}));
+ auto struct2 = field("struct2", struct_({f64, i64, struct1}));
+ this->SetSchema({struct1, struct2, f32, f64, i32, i64});
+ this->ProjectNested({".struct1.f32", ".struct2.struct1",
".struct2.struct1.f32"});
+ this->SetFilter(equal(field_ref(FieldRef("struct2", "i64")), literal(0)));
+
+ std::shared_ptr<Schema> expected_schema;
+ if (fine_grained_selection) {
+ // Some formats, like Parquet, let you pluck only a part of a complex
type
+ expected_schema = schema({
+ field("struct1", struct_({f32})),
+ field("struct2", struct_({i64, struct1})),
+ });
+ } else {
+ expected_schema = schema({struct1, struct2});
+ }
+
+ {
+ auto reader = this->GetRecordBatchReader(opts_->dataset_schema);
+ auto source = this->GetFileSource(reader.get());
+ auto fragment = this->MakeFragment(*source);
+
+ int64_t row_count = 0;
+ for (auto maybe_batch : PhysicalBatches(fragment)) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ row_count += batch->num_rows();
+ ASSERT_THAT(
+ batch->schema()->fields(),
+ ::testing::UnorderedPointwise(PointeesEquals(),
expected_schema->fields()))
+ << "EXPECTED:\n"
+ << expected_schema->ToString() << "\nACTUAL:\n"
+ << batch->schema()->ToString();
+ }
+ ASSERT_EQ(row_count, expected_rows());
+ }
+ {
+ // File includes an extra child in struct2
Review comment:
It seems arbitrary that we can't handle this case but we're fine with a
missing child. Though maybe I am reading the test incorrectly.
##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -709,6 +808,35 @@ class FileFormatScanMixin : public
FileFormatFixtureMixin<FormatHelper>,
ASSERT_EQ(row_count, expected_rows());
}
}
+ void TestScanWithDuplicateColumn() {
Review comment:
Is there any particular reason to allow this?
##########
File path: cpp/src/arrow/compute/exec/test_util.cc
##########
@@ -190,6 +190,19 @@ BatchesWithSchema MakeBasicBatches() {
return out;
}
+BatchesWithSchema MakeNestedBatches() {
+ auto ty = struct_({field("i32", int32()), field("bool", boolean())});
+ BatchesWithSchema out;
+ out.batches = {
+ ExecBatchFromJSON(
+ {ty}, R"([[{"i32": null, "bool": true}], [{"i32": 4, "bool":
false}]])"),
+ ExecBatchFromJSON(
+ {ty},
+ R"([[{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}],
[{"i32": 7, "bool": false}]])")};
Review comment:
Nit: Maybe include a top-level null.
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -111,9 +111,26 @@ static inline Result<csv::ConvertOptions>
GetConvertOptions(
if (!scan_options) return convert_options;
- auto materialized = scan_options->MaterializedFields();
- std::unordered_set<std::string> materialized_fields(materialized.begin(),
- materialized.end());
+ auto field_refs = scan_options->MaterializedFields();
+ std::unordered_set<std::string> materialized_fields;
+ materialized_fields.reserve(field_refs.size());
+ // Preprocess field refs. We try to avoid FieldRef::GetFoo here since that's
+ // quadratic (and this is significant overhead with 1000+ columns)
+ for (const auto& ref : field_refs) {
+ if (const auto* name = ref.name()) {
+ // Common case
+ materialized_fields.emplace(*name);
+ continue;
+ }
+ // CSV doesn't really support nested types so do our best
Review comment:
What is "do our best" mean here? It seems like we are looking for a
column with the same name as the parent field and then trying to cast from a
primitive type to the nested type? Will that ever succeed?
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -111,9 +111,26 @@ static inline Result<csv::ConvertOptions>
GetConvertOptions(
if (!scan_options) return convert_options;
- auto materialized = scan_options->MaterializedFields();
- std::unordered_set<std::string> materialized_fields(materialized.begin(),
- materialized.end());
+ auto field_refs = scan_options->MaterializedFields();
+ std::unordered_set<std::string> materialized_fields;
+ materialized_fields.reserve(field_refs.size());
+ // Preprocess field refs. We try to avoid FieldRef::GetFoo here since that's
+ // quadratic (and this is significant overhead with 1000+ columns)
+ for (const auto& ref : field_refs) {
+ if (const auto* name = ref.name()) {
Review comment:
Nit: Maybe it's just me but I have to look up `const auto*` every time
to remember what the rules are since we so rarely return raw pointers. Maybe
we could just be explicit.
```suggestion
if (const std::string* name = ref.name()) {
```
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1343,6 +1431,69 @@ DatasetAndBatches MakeBasicDataset() {
return {dataset, batches};
}
+DatasetAndBatches MakeNestedDataset() {
+ const auto dataset_schema = ::arrow::schema({
+ field("a", int32()),
+ field("b", boolean()),
+ field("c", struct_({
+ field("d", int64()),
+ field("e", float64()),
+ })),
+ });
+
+ const auto physical_schema = ::arrow::schema({
+ field("a", int32()),
+ field("b", boolean()),
+ field("c", struct_({
+ field("e", int64()),
+ })),
+ });
+
+ RecordBatchVector record_batches{
+ RecordBatchFromJSON(physical_schema, R"([{"a": 1, "b": null, "c":
{"e": 0}},
+ {"a": 2, "b": true, "c":
{"e": 1}}])"),
+ RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c":
{"e": 2}},
+ {"a": 3, "b": false, "c":
{"e": null}}])"),
+ RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c":
{"e": 4}},
+ {"a": 4, "b": false, "c":
{"e": 5}}])"),
+ RecordBatchFromJSON(physical_schema, R"([{"a": 5, "b": null, "c":
{"e": 6}},
+ {"a": 6, "b": false, "c":
{"e": 7}},
+ {"a": 7, "b": false, "c":
{"e": null}}])"),
Review comment:
Nit: Add some top-level nulls? Or cases where `c` is null?
##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -631,12 +646,92 @@ class FileFormatScanMixin : public
FileFormatFixtureMixin<FormatHelper>,
for (auto maybe_batch : PhysicalBatches(fragment)) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
row_count += batch->num_rows();
- AssertSchemaEqual(*batch->schema(), *expected_schema,
- /*check_metadata=*/false);
+ ASSERT_THAT(
+ batch->schema()->fields(),
+ ::testing::UnorderedPointwise(PointeesEquals(),
expected_schema->fields()))
+ << "EXPECTED:\n"
+ << expected_schema->ToString() << "\nACTUAL:\n"
+ << batch->schema()->ToString();
}
ASSERT_EQ(row_count, expected_rows());
}
+ void TestScanProjectedNested(bool fine_grained_selection = false) {
+ auto f32 = field("f32", float32());
+ auto f64 = field("f64", float64());
+ auto i32 = field("i32", int32());
+ auto i64 = field("i64", int64());
+ auto struct1 = field("struct1", struct_({f32, i32}));
+ auto struct2 = field("struct2", struct_({f64, i64, struct1}));
+ this->SetSchema({struct1, struct2, f32, f64, i32, i64});
+ this->ProjectNested({".struct1.f32", ".struct2.struct1",
".struct2.struct1.f32"});
+ this->SetFilter(equal(field_ref(FieldRef("struct2", "i64")), literal(0)));
+
+ std::shared_ptr<Schema> expected_schema;
+ if (fine_grained_selection) {
+ // Some formats, like Parquet, let you pluck only a part of a complex
type
+ expected_schema = schema({
+ field("struct1", struct_({f32})),
+ field("struct2", struct_({i64, struct1})),
+ });
+ } else {
+ expected_schema = schema({struct1, struct2});
+ }
Review comment:
If I'm understanding this I'm not sure I like it. I would expect the
resulting schema to be the same regardless of whether the underlying format
supported partial projection or not. For formats that don't support partial
projection I would expect it would be simulated by a full read and then a cast.
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -231,30 +231,105 @@ void AddColumnIndices(const SchemaField& schema_field,
}
}
-// Compute the column projection out of an optional arrow::Schema
-std::vector<int> InferColumnProjection(const parquet::arrow::FileReader&
reader,
- const ScanOptions& options) {
+Status ResolveOneFieldRef(
+ const SchemaManifest& manifest, const FieldRef& field_ref,
+ const std::unordered_map<std::string, const SchemaField*>& field_lookup,
+ const std::unordered_set<std::string>& duplicate_fields,
+ std::vector<int>* columns_selection) {
+ if (const auto* name = field_ref.name()) {
+ auto it = field_lookup.find(*name);
+ if (it != field_lookup.end()) {
+ AddColumnIndices(*it->second, columns_selection);
+ } else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
+ // We shouldn't generally get here because SetProjection will reject
such references
+ return Status::Invalid("Ambiguous reference to column '", *name,
+ "' which occurs more than once");
+ }
+ // "Virtual" column: field is not in file but is in the ScanOptions.
+ // Ignore it here, as projection will pad the batch with a null column.
+ return Status::OK();
+ }
+
+ const SchemaField* field = nullptr;
+ if (const auto* refs = field_ref.nested_refs()) {
Review comment:
```suggestion
if (const std::vector<FieldRef>* refs = field_ref.nested_refs()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]