GitHub user robo-todd added a comment to the discussion: Join problems with
custom TableProviders
Adding further information:
Here are the logical plans of the two different joins. My custom TableProviders
are based on the table provider example and I'm still trying to understand why
the join with custom table providers is returning a large number of tiny 1 or 2
row record batches.
Plan From MemTable test: (Results in 1 batch with 10's of rows depending on
data).
`Join(Join {
left: TableScan(TableScan { table_name: Partial { schema: "public", table:
"values" }, source: "...", projection: None,
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} },
Field { name:
"timestamp", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} },
Field { name: "value",
data_type: FixedSizeList(Field { name: "item", data_type: Float32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }, 64), nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "public", table: "values" }),
Some(Partial { schema: "public", table: "values" }), Some(Partial { schema:
"public", table: "values" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence {
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Single
}] } }, filters: [], fetch: None, .. }),
right: Filter(Filter { predicate: IsNotNull(Column(Column { relation:
Some(Partial { schema: "public", table: "entities" }), name: "super_entity" })),
input: TableScan(TableScan { table_name: Partial { schema: "public", table:
"entities" }, source: "...", projection: Some([0, 2]),
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} },
Field { name:
"super_entity", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "public", table: "entities" }),
Some(Partial { schema: "public", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence {
source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] }
}, filters: [],
fetch: None, .. }) }), on: [(Column(Column { relation: Some(Partial { schema:
"public", table: "values" }), name: "id" }),Column(Column { relation:
Some(Partial { schema: "public", table: "entities" }), name: "id" }))], filter:
None,
join_type: Inner, join_constraint: On, schema: DFSchema { inner: Schema {
fields: [Field { name: "id", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} },
Field { name: "timestamp", data_type: Timestamp(Nanosecond, None), nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "value", data_type: FixedSizeList(Field { name: "item",
data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }, 64), nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} },
Field { name: "id", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} },
Field { name: "super_entity", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "public", table: "values" }),
Some(Partial { schema: "public", table: "values" }), Some(Partial { schema:
"public", table: "values" }), Some(Partial { schema: "public", table:
"entities" }), Some(Partial { schema: "public", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence {
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Multi },
FunctionalDependence { source_indices: [3], target_indices: [3, 4], nullable:
false, mode: Multi }] } }, null_equality: NullEqualsNothing })
Results in 1 batches:`
Plan From Custom TableProvider Test: (Results in 18 batches of 1-2 row data)
`Join(Join {
left: TableScan(TableScan { table_name: Partial { schema: "testcase", table:
"values" }, source: "...", projection: None,
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id",
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} },
Field { name: "timestamp",
data_type: Timestamp(Microsecond, None), nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} },
Field { name: "value",
data_type: List(Field { name: "item", data_type: Float32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "testcase", table: "values" }),
Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema:
"testcase", table: "values" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence {
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Single
}] } }, filters: [], fetch: None, .. }),
right: Filter(Filter { predicate: IsNotNull(Column(Column { relation:
Some(Partial { schema: "testcase", table: "entities" }), name: "super_entity"
})),
input: TableScan(TableScan { table_name: Partial { schema: "testcase", table:
"entities" }, source: "...", projection: Some([0, 2]),
projected_schema: DFSchema { inner: Schema { fields: [Field { name: "id",
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} },
Field { name:
"super_entity", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "testcase", table: "entities" }),
Some(Partial { schema: "testcase", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence {
source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] }
}, filters: [],
fetch: None, .. }) }),
on: [(Column(Column { relation: Some(Partial { schema: "testcase", table:
"values" }), name: "id" }), Column(Column { relation: Some(Partial { schema:
"testcase", table: "entities" }), name: "id" }))], filter: None,
join_type: Inner, join_constraint: On, schema: DFSchema { inner: Schema {
fields: [Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} },
Field { name: "timestamp", data_type: Timestamp(Microsecond, None),
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "value", data_type: List(Field { name: "item", data_type:
Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "id", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} },
Field { name: "super_entity", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }], metadata: {} },
field_qualifiers: [Some(Partial { schema: "testcase", table: "values" }),
Some(Partial { schema: "testcase", table: "values" }), Some(Partial { schema:
"testcase", table: "values" }), Some(Partial { schema: "testcase", table:
"entities" }), Some(Partial { schema: "testcase", table: "entities" })],
functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence {
source_indices: [0], target_indices: [0, 1, 2], nullable: false, mode: Multi },
FunctionalDependence { source_indices: [3], target_indices: [3, 4], nullable:
false, mode: Multi }] } }, null_equality: NullEqualsNothing })
Join in 18 batches:`
GitHub link:
https://github.com/apache/datafusion/discussions/16981#discussioncomment-13950082
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]