package org.apache.beam.sdk.nexmark;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class TriggerAfterCount {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    PCollection<Integer> events =
        pipeline.apply(
            Create.timestamped(
                TimestampedValue.of(1, new Instant(((long) 1596216396) * 1000)),
                TimestampedValue.of(2, new Instant(((long) 1596216397) * 1000)),
                TimestampedValue.of(3, new Instant(((long) 1596216398) * 1000)),
                TimestampedValue.of(4, new Instant(((long) 1596216399) * 1000)),
                TimestampedValue.of(5, new Instant(((long) 1596216400) * 1000)),
                TimestampedValue.of(6, new Instant(((long) 1596216402) * 1000)),
                TimestampedValue.of(7, new Instant(((long) 1596216403) * 1000)),
                TimestampedValue.of(8, new Instant(((long) 1596216405) * 1000))
            )
        );

    events
        .apply(
            Window.<Integer>into(new GlobalWindows())
                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
                .discardingFiredPanes()
                .withAllowedLateness(Duration.ZERO))
        .apply(Count.globally())
        .apply(
            "toString",
            ParDo.of(
                new DoFn<Long, String>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(c.element().toString());
                  }
                }))
        .apply(
            "writeToFile",
            TextIO.write().to("java-test-results")
                .withSuffix(".txt")
                .withNumShards(1));
    pipeline.run();
  }
}
