package com.intuit.strmprocess.once;

import static com.google.common.base.MoreObjects.firstNonNull;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.StreamSupport;

import com.intuit.strmprocess.util.WavefrontMeterRegistrySingleton;
import io.micrometer.core.instrument.Counter;
import io.micrometer.wavefront.WavefrontMeterRegistry;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
// import org.apache.beam.sdk.metrics.Counter;
// import org.apache.beam.sdk.metrics.Distribution;
// import org.apache.beam.sdk.metrics.Gauge;
// import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
// import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
// import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.grpc.v1p13p1.io.opencensus.stats.Aggregation;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// import org.apache.samza.serializers.SerializableSerdeFactory;

public class OnceDemoWordCount01 implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(OnceDemoWordCount01.class);

    static class MapToCountFn extends SimpleFunction<KafkaRecord<String, String>,  KV<String, Long>> {

        private Logger mapLogger;

        @Override
        public KV<String, Long> apply(KafkaRecord<String, String> rec)
        {
            String val = rec.getKV().getValue();
            if (mapLogger == null) {
                mapLogger = LoggerFactory.getLogger(OnceDemoWordCount01.class);
            }

            mapLogger.info("+++ read from topic={}, part={}, offset={}, val: {}",
                rec.getTopic(), rec.getPartition(), rec.getOffset(), val);
            KV<String, Long> kv = KV.of(val, 1L);
            // System.out.println("MapToCountFn: s=" + str);
            return kv;
        }
    }

    public static class UpdateCounterFn extends DoFn<KV<String, Long>, KV<String, String>> {

        private Logger updateLogger;

        public UpdateCounterFn() {
        }

        @Setup
        public void setup() {
            updateLogger = LoggerFactory.getLogger(OnceDemoWordCount01.class);
        }

        @ProcessElement
        public void processElement(ProcessContext c, BoundedWindow window)
        {
            String key = c.element().getKey();
            Long cnt = c.element().getValue();

            updateLogger.info("+++++++++++++ prcessing key={}, cnt={}", key, cnt);

            if (key.startsWith("m")) {
                RuntimeException re = new RuntimeException("detected key=" + key);
                updateLogger.error("!!!!!!!!!!!! detected key={}, throwing", key, re);
                throw re;
            } else {
                long newCnt = cnt.longValue() * 10;
                c.output(KV.of(key, Long.toString(newCnt)));
            }
        }

    }



    private final String[] args;
    private final String inputTopicName;
    private final String outputTopicName;
    private final Properties kafkaProps;

    public OnceDemoWordCount01(String[] args, String inputTopicName, String outputTopicName, Properties kafkaProps) {
        this.args = args;
        this.inputTopicName = inputTopicName;
        this.outputTopicName = outputTopicName;
        this.kafkaProps = kafkaProps;
        
        List<String> list = Arrays.asList(args);
        logger.info("creating OnceDemoWordCount01: args: {}, inputTopic={}, outputTopic={}, kafkaProps: {}",
            list, inputTopicName, outputTopicName, kafkaProps);
    }


    public static void main(String[] args)
    {
        String inputTopic = "once-input";
        String outputTopic = "once-output";

        logger.info("OnceDemoWordCount01 runs locally, inputTopic={}, outputTopic={}",
            inputTopic, outputTopic);

        Properties appProps = new Properties();
        appProps.setProperty("bootstrap.servers", "localhost:9092");
        appProps.setProperty("group.id", "once-group-id");
        // appProps.setProperty("auto.offset.reset", "earliest");

        OnceDemoWordCount01 app = new OnceDemoWordCount01(args, inputTopic, outputTopic, appProps);

        app.run();

        logger.info("------------------------ OnceDemoWordCount01 completed ---------------------");

    }


    @Override
    public void run() {
        try {
            runImpl();
        } catch (Exception e) {
            logger.error("OnceDemoWordCount01 failed: ", e);
        }
    }

    public void runImpl() throws IOException
    {
        logger.info("+++++++ OnceDemoWordCount01 started +++++++++");
        
        HashMap<String, Object> kafkaProperties = new HashMap<>();
        for (final String name: kafkaProps.stringPropertyNames()) {
            kafkaProperties.put(name, kafkaProps.getProperty(name));
        }

        PipelineOptions options = PipelineOptionsFactory
                .fromArgs(args).withValidation().create();
        logger.info("OnceDemoWordCount01: pipeline options: {}", options);


        // Create the Pipeline object with the options we defined above
        Pipeline p = Pipeline.create(options);
        
        p.apply("read from kafka",
                KafkaIO.<String, String>read()
                        // .withBootstrapServers("localhost:9092")
                        .withBootstrapServers(kafkaProps.getProperty("bootstrap.servers"))
                        .withTopic(inputTopicName)
                        .withKeyDeserializer(StringDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class)
                        // .withConsumerFactoryFn(MY_KAFKA_CONSUMER_FACTORY_FN)
                        // .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object) "latest"))
                        .updateConsumerProperties(kafkaProperties)
                        .commitOffsetsInFinalize()
                        // .updateConsumerProperties(additionalParams)
                        // .withoutMetadata()
                )
                
        // .apply("create", Values.create())
                
        .apply("convertToPairs", MapElements.via(new MapToCountFn()))
        
        // .apply("Grouping by Window", window1)
        .apply(Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardSeconds(30)))
                .triggering(AfterWatermark.pastEndOfWindow())
                .withAllowedLateness(Duration.ZERO)  // required for global windows
                .accumulatingFiredPanes()
                // .discardingFiredPanes()
        )

        .apply("SumPerKey", Sum.longsPerKey())

        .apply("UpdateCounter", ParDo.of(new UpdateCounterFn()))

        .apply("write to kafka", KafkaIO.<String, String>write()
            // .withBootstrapServers("localhost:9092")
            .withBootstrapServers(kafkaProps.getProperty("bootstrap.servers"))
            .withTopic(outputTopicName)
            .withKeySerializer(StringSerializer.class)
            .withValueSerializer(StringSerializer.class)
            .updateProducerProperties(kafkaProperties)
       );

        logger.info("+++++++ OnceDemoWordCount01 waitUntilFinish +++++++++");
        
        try {
            p.run().waitUntilFinish();
        } catch (Exception e) {
            logger.error("OnceDemoWordCount01 execution failed:", e);
        }
    }


}