[ 
https://issues.apache.org/jira/browse/FLINK-3253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110687#comment-15110687
 ] 

godfrey he commented on FLINK-3253:
-----------------------------------

minimal example: read data(like: 'name' 'timestamp' 'value') from kafka, and 
assign event timestamp, aggregate the values of the same 'name' each one minute.

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));

env.addSource(new FlinkKafkaConsumer082("topic", new SimpleStringSchema(), 
properties))
        .flatMap(new FlatMapFunction<String, AggregatableDataPoint>() {
            @Override
            public void flatMap(String str, Collector<AggregatableDataPoint> 
out) {
                String[] words = str.split(" ");
                String name = words[0];
                long timestamp = Long.valueOf(words[1]);
                double value = Double.valueOf(words[2]);
                Aggregator aggregator = Aggregators.get("sum");
                aggregator.addValue(value);
                out.collect(new AggregatableDataPoint(name, timestamp, 
aggregator));
            }
        })
        .assignTimestamps(new TimestampExtractor<AggregatableDataPoint>() {
            @Override
            public long extractTimestamp(AggregatableDataPoint element, long 
currentTimestamp) {
                return element.getTimestamp();
            }
            @Override
            public long extractWatermark(AggregatableDataPoint element, long 
currentTimestamp) {
                return Long.MIN_VALUE;
            }
            @Override
            public long getCurrentWatermark() {
                return System.currentTimeMillis() - 10000; // delay 10s
            }
        })
        .keyBy(new KeySelector<AggregatableDataPoint, Tuple2<Long, String>>() {
            @Override
            public Tuple2<Long, String> getKey(AggregatableDataPoint dp) throws 
Exception {
                Long time = dp.getTimestamp() / 60000; // aggregate data in 1 
minute to 1 dp
                return new Tuple2<>(time, dp.getName());
            }
        })
        .timeWindow(Time.of(60000, TimeUnit.MILLISECONDS))
        .apply(new ReduceFunction<AggregatableDataPoint>() {
            @Override
            public AggregatableDataPoint reduce(AggregatableDataPoint 
lastElement, AggregatableDataPoint curElement) throws Exception {
                lastElement.aggregate(curElement.getValue());
                return lastElement; // always return first element
            }
        }, new WindowFunction<AggregatableDataPoint, AggregatableDataPoint, 
Tuple2<Long, String>, TimeWindow>() {
            @Override
            public void apply(Tuple2<Long, String> key, TimeWindow window,  
Iterable<AggregatableDataPoint> values, Collector<AggregatableDataPoint> out) 
throws Exception {
                out.collect(values.iterator().next()); // only one element
            }
        });
env.execute();


public class AggregatableDataPoint implements DataPoint {
    private String name;
    private long timestamp;
    private Aggregator aggregator; // sum, avg ...

    public AggregatableDataPoint(String name, long timestamp,
                                 Aggregator aggregator) {
        this.name = name;
        this.timestamp = timestamp;
        this.aggregator = aggregator;
    }

    public void aggregate(double value) {
        this.aggregator.addValue(value);
    }
}

public interface DataPoint extends Serializable {
    String getName();
    long getTimestamp();
    double getValue();
}

public interface Aggregator extends Serializable {
    void addValue(double value);
    double getValue();
}

public class Aggregators {
    public static Aggregator get(final String name) {
        switch (name) {
            case "sum":
                return new Sum();
            default:
                throw new IllegalArgumentException("No such aggregator: " + 
name);
        }
    }

    public static class Sum implements Aggregator {
        private double sum = 0;
        @Override
        public void addValue(double value) {
            sum += value;
        }
        @Override
        public double getValue() {
            return sum;
        }
    }
}

thank you

> deserializeObject exception on WindowOperator
> ---------------------------------------------
>
>                 Key: FLINK-3253
>                 URL: https://issues.apache.org/jira/browse/FLINK-3253
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 0.10.0
>         Environment: flink on yarn, hadoop 2.6
> state backend: hdfs
> jobmanager: high-availability
>            Reporter: godfrey he
>
> It's seem that the data in the WindowOperator can serialize successfully. 
> however deserialize failed when the taskmanager restarted by jobmanager. 
> Root exception
> java.lang.Exception: Could not restore checkpointed state to operators and 
> functions
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:744)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2508)
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2543)
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2615)
>       at java.io.DataInputStream.readInt(DataInputStream.java:387)
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2820)
>       at java.io.ObjectInputStream.readInt(ObjectInputStream.java:971)
>       at java.util.HashMap.readObject(HashMap.java:1158)
>       at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to