package org.apache.beam.sdk.nexmark;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class GroupByKeyTest {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    PCollection<KV<Integer, Integer>> events =
        pipeline.apply(
            Create.timestamped(
                TimestampedValue.of(KV.of(1, 1), new Instant(((long) 1596216396) * 1000)),
                TimestampedValue.of(KV.of(1, 2), new Instant(((long) 1596216397) * 1000)),
                TimestampedValue.of(KV.of(1, 3), new Instant(((long) 1596216398) * 1000)),
                TimestampedValue.of(KV.of(2, 4), new Instant(((long) 1596216399) * 1000)),
                TimestampedValue.of(KV.of(3, 5), new Instant(((long) 1596216400) * 1000)),
                TimestampedValue.of(KV.of(2, 6), new Instant(((long) 1596216402) * 1000)),
                TimestampedValue.of(KV.of(3, 7), new Instant(((long) 1596216403) * 1000)),
                TimestampedValue.of(KV.of(4, 8), new Instant(((long) 1596216405) * 1000)),
                TimestampedValue.of(KV.of(4, 9), new Instant(((long) 1596216406) * 1000))
            )
        );

    events
        .apply(
            Window.<KV<Integer, Integer>>into(new GlobalWindows())
                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
                .discardingFiredPanes()
                .withAllowedLateness(Duration.ZERO))
        .apply(GroupByKey.create())
        .apply(
            "toString",
            ParDo.of(
                new DoFn<KV<Integer, Iterable<Integer>>, String>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(c.element().toString());
                  }
                }))
        .apply(
            "writeToFile",
            TextIO.write().to("java-test-results")
                .withSuffix(".txt")
                .withNumShards(1));
    pipeline.run();
  }
}
