Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/316#discussion_r66520504
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/window/WindowOption.java ---
    @@ -0,0 +1,143 @@
    +package com.datatorrent.lib.window;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +/**
    + *
    + */
    +public abstract class WindowOption
    +{
    +
    +  private TriggerOption[] triggerOptions;
    +
    +  private AccumulationMode accumulationMode = AccumulationMode.ACCUMULATE;
    +
    +  private List<Object[]> lateness = new LinkedList<>();
    +
    +  public static enum AccumulationMode
    +  {
    +    DISCARD,
    +    ACCUMULATE,
    +    ACCUMULATE_DELTA
    +  }
    +
    +  public static class GlobalWindow extends WindowOption
    +  {
    +
    +  }
    +
    +  public static class FixedWindow extends WindowOption
    +  {
    +    FixedWindow(long quantity, Quantification.Unit unit)
    +    {
    +      size.add(new Object[]{quantity, unit});
    +    }
    +
    +    FixedWindow(List<Object[]> size)
    +    {
    +      this.size = size;
    +    }
    +
    +    private List<Object[]> size = new LinkedList<>();
    +
    +    public SlidingWindow slideBy(long slidingQuantity, Quantification.Unit 
slidingUnit)
    +    {
    +      SlidingWindow sw = new SlidingWindow(this.size);
    +      sw.delta.add(new Object[]{slidingQuantity, slidingUnit});
    +      return sw;
    +    }
    +
    +    public FixedWindow and(long quantity, Quantification.Unit unit)
    +    {
    +      size.add(new Object[]{quantity, unit});
    +      return this;
    +    }
    +
    +    public List<Object[]> getSize()
    +    {
    +      return size;
    +    }
    +  }
    +
    +  public static class SlidingWindow extends FixedWindow
    +  {
    +
    +    SlidingWindow(List<Object[]> size)
    +    {
    +      super(size);
    +    }
    +
    +    List<Object[]> delta = new LinkedList<>();
    +
    +    @Override
    +    public SlidingWindow and(long quantity, Quantification.Unit unit)
    +    {
    +      delta.add(new Object[]{quantity, unit});
    +      return this;
    +    }
    +  }
    +
    +  public static class SessionWindow extends WindowOption
    --- End diff --
    
    These concepts are detailed here:
    https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
    http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
    
    It's part of the windowing concept in the Apache Beam API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to