luke de feo created BEAM-12034:
----------------------------------

             Summary: beam sql union of 2 unbounded streams always performs CGBK
                 Key: BEAM-12034
                 URL: https://issues.apache.org/jira/browse/BEAM-12034
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-core
    Affects Versions: 2.28.0
            Reporter: luke de feo


I would like to be able to write the following SQL

 
{code:java}
SELECT
 ordersevt1.TimeStamp,
 ordersevt1.OrderId,
 ordersevt1.RestaurantId,
 ordersevt1.Tenant,
 "orderplaced" AS Topic
FROM ordersevt1
UNION ALL
SELECT
 ordersevt2.TimeStamp,
 ordersevt2.OrderId,
 cast(ordersevt2.Restaurant.Id as INT64) AS RestaurantId,
 ordersevt2.Tenant,
 "ordercreated" AS Topic
FROM ordersevt2{code}
 

Use case is I have 2 Pcollections which have similar data stored in a slightly 
different structure. I want to write a step to transform each into a common 
structure.

 

These transformations are completely independant so i would expect a Series of 
ParDo steps to transform each source pcollection then  something like this

 

```

PCollectionList
 .of(inputs)
 .apply("flatten", Flatten.pCollections())

```

 

In reality when i run this code this code I get a cogroup by code followed by a 
group by key. 

 

The relevant code seemms to be in 

 

org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase

 
{code:java}
// TODO: We may want to preaggregate the counts first using Group instead of 
calling CoGroup and
// measuring the
// iterable size. If on average there are duplicates in the input, this will be 
faster.
final String lhsTag = "lhs";
final String rhsTag = "rhs";
PCollection<Row> joined =
 PCollectionTuple.of(lhsTag, leftRows, rhsTag, rightRows)
 .apply("CoGroup", CoGroup.join(By.fieldNames("*")));
return joined
 .apply(
 "FilterResults",
 ParDo.of(
 new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(
 lhsTag, rhsTag, opType, all)))
 
.setRowSchema(joined.getSchema().getField("key").getType().getRowSchema());{code}
 

Is there a reason we are always doing a co group by key, all the extra shuffle 
seems wasteful 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to