luke de feo created BEAM-12034:
----------------------------------
Summary: beam sql union of 2 unbounded streams always performs CGBK
Key: BEAM-12034
URL: https://issues.apache.org/jira/browse/BEAM-12034
Project: Beam
Issue Type: Bug
Components: sdk-java-core
Affects Versions: 2.28.0
Reporter: luke de feo
I would like to be able to write the following SQL
{code:java}
SELECT
ordersevt1.TimeStamp,
ordersevt1.OrderId,
ordersevt1.RestaurantId,
ordersevt1.Tenant,
"orderplaced" AS Topic
FROM ordersevt1
UNION ALL
SELECT
ordersevt2.TimeStamp,
ordersevt2.OrderId,
cast(ordersevt2.Restaurant.Id as INT64) AS RestaurantId,
ordersevt2.Tenant,
"ordercreated" AS Topic
FROM ordersevt2{code}
Use case is I have 2 Pcollections which have similar data stored in a slightly
different structure. I want to write a step to transform each into a common
structure.
These transformations are completely independant so i would expect a Series of
ParDo steps to transform each source pcollection then something like this
```
PCollectionList
.of(inputs)
.apply("flatten", Flatten.pCollections())
```
In reality when i run this code this code I get a cogroup by code followed by a
group by key.
The relevant code seemms to be in
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase
{code:java}
// TODO: We may want to preaggregate the counts first using Group instead of
calling CoGroup and
// measuring the
// iterable size. If on average there are duplicates in the input, this will be
faster.
final String lhsTag = "lhs";
final String rhsTag = "rhs";
PCollection<Row> joined =
PCollectionTuple.of(lhsTag, leftRows, rhsTag, rightRows)
.apply("CoGroup", CoGroup.join(By.fieldNames("*")));
return joined
.apply(
"FilterResults",
ParDo.of(
new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(
lhsTag, rhsTag, opType, all)))
.setRowSchema(joined.getSchema().getField("key").getType().getRowSchema());{code}
Is there a reason we are always doing a co group by key, all the extra shuffle
seems wasteful
--
This message was sent by Atlassian Jira
(v8.3.4#803005)