[
https://issues.apache.org/jira/browse/BEAM-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685607#comment-16685607
]
Robert Burke commented on BEAM-3296:
------------------------------------
My guess is that the is due to probably CombinerLifting? I can't recall if we
simulated that in the direct runner though. Otherwise I'm not sure.
I don't even know if empty bundles are semantically valid in the beam model, so
empty -> 1 value through a combiner seems odd to me from a data processing
standpoint.
The problem I see is that when grouping by key, if there are no values... then
there were no keys... which means there's nothing grouped, which means nothing
would trigger execution per element. This means it can't be done with a plain
function/binary combiner, and they *all* need to be StructuralDoFns so we can
retain bundle level state.
So the only fix there would be to track if anything was processed in the
bundle, if not, emit it in the FinishBundle step (with appropriate state reset
in StartBundle).
Except these are Combiners, so we need them to work properly through Combiner
Lifting. At present, we don't use StartBundle and FinishBundle methods on
combiners. So we'd need to have the various StructuralCombineFns set the
"default" values in their CreateAccululator() methods. Then I think it might
work properly for the per-value case. On a CombinePerKey though, there doesn't
seem to be a way to achieve that, since no key = no invocation, so this would
be a half solution at best. Obviously we wouldn't want to try and generate
random keys.
It would be a lot of boiler plate code, and StructuralDoFns are currently
slower than what's currently implemented due to how methods on structs are
handled.
If you're just doing a Combine (not CombinePerKey), you can fake the behavior
you want by beam.Create-ing the appropriate zero/default value and flattening
that PCollection with the actual data PCollection.
We might be able to provide something that does that for "per key", keys as
well using the UniversalTypes, if we can be provided a stream of the keys. I'd
need to give it some more thought.
> Combine semantics for empty bundle for min, max is wrong
> --------------------------------------------------------
>
> Key: BEAM-3296
> URL: https://issues.apache.org/jira/browse/BEAM-3296
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Reporter: Henning Rohde
> Priority: Minor
>
> The combine semantics for Min for empty bundles is to emit MAX_INT, if using
> global windows. Our implementation currently deviates from java, because we
> use binary combiners (and thus emit the default element for the type (0).
> Ditto for the other binary combiners in stats.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)