Thanks Robert and Kenn for your comments.

"Custom window merging" may be an improper term.

Let me add a bit of context to explain: while coding for Nexmark port (https://github.com/apache/beam/pull/3114) and in particular for Query9, a use case arose: even if it could have been implemented differently (I guess the original idea was to test to extend windowFn), query 9 used a UDF that extends windowFn and assigns elements to windows based on element values. Theses windows could then be merged based on a field in addition to a timestamp.

The fact that it relies on the values of elements makes it is another case than "standard" windows.

Consider a case in which a user creates a UDF such as the one recently added to WindowTest (https://github.com/apache/beam/pull/3286) see an test example bellow:

private static class CustomWindow extends IntervalWindow {
  private boolean isBig; ...
}

private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {
  ...
  @Override public Collection<CustomWindow> assignWindows(AssignContext 
c)throws Exception {
    ...
    String element =  c.element(); // put big elements in windows of 30s and small ones 
in windows of 5s if ("big".equals(element)) {
      return Collections.singletonList(
          new CustomWindow(c.timestamp(), 
c.timestamp().plus(Duration.standardSeconds(30)), true)); }else {
      return Collections.singletonList(
          new CustomWindow(c.timestamp(), 
c.timestamp().plus(Duration.standardSeconds(5)), false)); }
  }

  @Override public void mergeWindows(MergeContext c)throws Exception {
    List<CustomWindow> toBeMerged = new ArrayList<>(); CustomWindow bigWindow 
=null; for (CustomWindow customWindow : c.windows()) {
      if (customWindow.isBig) {
        bigWindow = customWindow; toBeMerged.add(customWindow); }else if (bigWindow 
!=null && customWindow.start().isAfter(bigWindow.start())
          && customWindow.end().isBefore(bigWindow.end())) {
        toBeMerged.add(customWindow); }
    }
    // in case bigWindow has not been seen yet if (bigWindow !=null) {
      // merge small windows into big windows c.merge(toBeMerged, bigWindow); }
  }

Indeed like Robert said, runners might have partial support for this use case. For example, spark currently relies on IntervalWindow to implement the merge (https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java#L97) so the UDF windowFn.merge will not be called. But Flink relies on the actual windowFn.merge (https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java#L140).

That point is discussed in this ticket https://issues.apache.org/jira/browse/BEAM-1772 for Flink and this ticket https://issues.apache.org/jira/browse/BEAM-2499 for Spark.

Amit, Aljoscha, if you want to add precisions, feel free to comment.

I don't know if it is an addition to the model but I agree, it might deserve some discussion.

Best!

Etienne


Le 27/07/2017 à 08:39, Robert Bradshaw a écrit :
On Wed, Jul 26, 2017 at 9:43 PM, Kenneth Knowles <[email protected]>
wrote:

This is a bit of an improvised change to the Beam model, if these are
really treated *that* specially. (notably, they are a subset of the
WindowFns that we ship with our SDKs, so it really is a careful selection)

It does make sense to have some special WindowFns with distinguished
semantics, since a lot of use cases are covered without the generality of
the Beam model. Having a compact proto repr for these makes sense, but the
particular choice of functions I had viewed as a hack while the Fn API was
maturing to support the real model.

Compactness of the proto representation isn't important, but it is good to
often have a language-independent representation (and definition). Many
common UserFns should likely have a language-independent urn and format,
especially once we start looking at multi-language pipelines.

If this is a new special category in the model (capability matrix?) there
should be a lot more input and deliberation as to what they are. TBH I
think we will probably choose just the WindowFns that you did, but the
standardization deserves attention and documentation.

These are already called out individually in the compatibility matrix,
which probably makes sense as it allows a runner to declare "partial"
support for windowing.

On Wed, Jul 26, 2017 at 9:34 PM, Robert Bradshaw <
[email protected]> wrote:

I think there may be a distinction between hard-coding support for the
"standard" WindowFns (e.g.
https://github.com/apache/beam/blob/master/sdks/common/
runner-api/src/main/proto/standard_window_fns.proto)
and accepting WindowFns as a UDF. Different runners have offered
different
levels of support for this in the past (for example, the Fn API doesn't
support WindowFns other than these standard ones unless they're
implemented
in Java--and even then it'd probably be better for these to be executed
in
the SDK context).

On Wed, Jul 26, 2017 at 9:16 PM, Kenneth Knowles <[email protected]

wrote:

Hi Etienne,

Every WindowFn is a UDF, so there is really no such thing as "custom"
window merging. Is this the same as saying that a runner supports only
merging for Sessions? Or just supports WindowFn that merges based on
overlap?

Kenn

On Mon, Jul 24, 2017 at 10:15 AM, Etienne Chauchot <
[email protected]>
wrote:

Hi all,

There is now 2 new ValidatesRunner tests: WindowTest.
testMergingCustomWindows
and WindowTest.testMergingCustomWindowsKeyedCollection. The aim of
these
tests is to verify that the runners can handle custom windowFn
(extensions
of windowFn that, for example, could rely on elements in addition to
timestamps).

As new runners are coming, I wanted to let you know that there is
also
a
new category tag UsesCustomWindowMerging that you can use to skip
these
tests while running ValidatesRunner tests on runners that do not
support
custom window merging yet.

Besides, there is also an ongoing related PR (
https://github.com/apache/beam/pull/3592) to enhance the test utils
methods of WindowFnTestUtils.

Etienne






Reply via email to