je-ik commented on code in PR #33521:
URL: https://github.com/apache/beam/pull/33521#discussion_r1905432295
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java:
##########
@@ -146,32 +146,36 @@ public void evaluate(GroupByKey<K, V> transform,
EvaluationContext context) {
JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupedByKey;
Partitioner partitioner = getPartitioner(context);
- // As this is batch, we can ignore triggering and allowed lateness
parameters.
- if (windowingStrategy.getWindowFn().equals(new GlobalWindows())
- &&
windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW))
{
- // we can drop the windows and recover them later
- groupedByKey =
- GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(
- inRDD, keyCoder, coder.getValueCoder(), partitioner);
Review Comment:
Actually, seems that these optimizations are not mutually exclusive. We
might want to apply both.
##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java:
##########
@@ -146,32 +146,36 @@ public void evaluate(GroupByKey<K, V> transform,
EvaluationContext context) {
JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupedByKey;
Partitioner partitioner = getPartitioner(context);
- // As this is batch, we can ignore triggering and allowed lateness
parameters.
- if (windowingStrategy.getWindowFn().equals(new GlobalWindows())
- &&
windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW))
{
- // we can drop the windows and recover them later
- groupedByKey =
- GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(
- inRDD, keyCoder, coder.getValueCoder(), partitioner);
- } else if
(GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy))
{
+ if
(GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy))
{
// we can have a memory sensitive translation for non-merging windows
groupedByKey =
Review Comment:
I know this is not part of this issue, but there is unresolved problem with
this - under certain circumstances there is a need for GBK result to be
Reiterable (e.g. CoGroupByKey). Could we figure out a way to automatically
figure out if this expansion is correct? From the top of my head - if a
Pipeline does not use CoGBK at all (better if we can look is this transform is
part of CoGBK expansion), then this might be OK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]