[ 
https://issues.apache.org/jira/browse/FLINK-3253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110687#comment-15110687
 ] 

godfrey he edited comment on FLINK-3253 at 1/22/16 1:33 AM:
------------------------------------------------------------

minimal example: read data(like: 'name' 'timestamp' 'value') from kafka, and 
assign event timestamp, aggregate the values of the same 'name' each one minute.
It's seem that the exception was threw when the task manager was restarted. 

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));

env.addSource(new FlinkKafkaConsumer082("topic", new SimpleStringSchema(), 
properties))
        .flatMap(new FlatMapFunction<String, AggregatableDataPoint>() {
            @Override
            public void flatMap(String str, Collector<AggregatableDataPoint> 
out) {
                String[] words = str.split(" ");
                String name = words[0];
                long timestamp = Long.valueOf(words[1]);
                double value = Double.valueOf(words[2]);
                Aggregator aggregator = Aggregators.get("sum");
                aggregator.addValue(value);
                out.collect(new AggregatableDataPoint(name, timestamp, 
aggregator));
            }
        })
        .assignTimestamps(new TimestampExtractor<AggregatableDataPoint>() {
            @Override
            public long extractTimestamp(AggregatableDataPoint element, long 
currentTimestamp) {
                return element.getTimestamp();
            }
            @Override
            public long extractWatermark(AggregatableDataPoint element, long 
currentTimestamp) {
                return Long.MIN_VALUE;
            }
            @Override
            public long getCurrentWatermark() {
                return System.currentTimeMillis() - 10000; // delay 10s
            }
        })
        .keyBy(new KeySelector<AggregatableDataPoint, Tuple2<Long, String>>() {
            @Override
            public Tuple2<Long, String> getKey(AggregatableDataPoint dp) throws 
Exception {
                Long time = dp.getTimestamp() / 60000; // aggregate data in 1 
minute to 1 dp
                return new Tuple2<>(time, dp.getName());
            }
        })
        .timeWindow(Time.of(60000, TimeUnit.MILLISECONDS))
        .apply(new ReduceFunction<AggregatableDataPoint>() {
            @Override
            public AggregatableDataPoint reduce(AggregatableDataPoint 
lastElement, AggregatableDataPoint curElement) throws Exception {
                lastElement.aggregate(curElement.getValue());
                return lastElement; // always return first element
            }
        }, new WindowFunction<AggregatableDataPoint, AggregatableDataPoint, 
Tuple2<Long, String>, TimeWindow>() {
            @Override
            public void apply(Tuple2<Long, String> key, TimeWindow window,  
Iterable<AggregatableDataPoint> values, Collector<AggregatableDataPoint> out) 
throws Exception {
                out.collect(values.iterator().next()); // only one element
            }
        });
env.execute();


public class AggregatableDataPoint implements DataPoint {
    private String name;
    private long timestamp;
    private Aggregator aggregator; // sum, avg ...

    public AggregatableDataPoint(String name, long timestamp,
                                 Aggregator aggregator) {
        this.name = name;
        this.timestamp = timestamp;
        this.aggregator = aggregator;
    }

    public void aggregate(double value) {
        this.aggregator.addValue(value);
    }
}

public interface DataPoint extends Serializable {
    String getName();
    long getTimestamp();
    double getValue();
}

public interface Aggregator extends Serializable {
    void addValue(double value);
    double getValue();
}

public class Aggregators {
    public static Aggregator get(final String name) {
        switch (name) {
            case "sum":
                return new Sum();
            default:
                throw new IllegalArgumentException("No such aggregator: " + 
name);
        }
    }

    public static class Sum implements Aggregator {
        private double sum = 0;
        @Override
        public void addValue(double value) {
            sum += value;
        }
        @Override
        public double getValue() {
            return sum;
        }
    }
}

thank you


was (Author: godfreyhe):
minimal example: read data(like: 'name' 'timestamp' 'value') from kafka, and 
assign event timestamp, aggregate the values of the same 'name' each one minute.

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));

env.addSource(new FlinkKafkaConsumer082("topic", new SimpleStringSchema(), 
properties))
        .flatMap(new FlatMapFunction<String, AggregatableDataPoint>() {
            @Override
            public void flatMap(String str, Collector<AggregatableDataPoint> 
out) {
                String[] words = str.split(" ");
                String name = words[0];
                long timestamp = Long.valueOf(words[1]);
                double value = Double.valueOf(words[2]);
                Aggregator aggregator = Aggregators.get("sum");
                aggregator.addValue(value);
                out.collect(new AggregatableDataPoint(name, timestamp, 
aggregator));
            }
        })
        .assignTimestamps(new TimestampExtractor<AggregatableDataPoint>() {
            @Override
            public long extractTimestamp(AggregatableDataPoint element, long 
currentTimestamp) {
                return element.getTimestamp();
            }
            @Override
            public long extractWatermark(AggregatableDataPoint element, long 
currentTimestamp) {
                return Long.MIN_VALUE;
            }
            @Override
            public long getCurrentWatermark() {
                return System.currentTimeMillis() - 10000; // delay 10s
            }
        })
        .keyBy(new KeySelector<AggregatableDataPoint, Tuple2<Long, String>>() {
            @Override
            public Tuple2<Long, String> getKey(AggregatableDataPoint dp) throws 
Exception {
                Long time = dp.getTimestamp() / 60000; // aggregate data in 1 
minute to 1 dp
                return new Tuple2<>(time, dp.getName());
            }
        })
        .timeWindow(Time.of(60000, TimeUnit.MILLISECONDS))
        .apply(new ReduceFunction<AggregatableDataPoint>() {
            @Override
            public AggregatableDataPoint reduce(AggregatableDataPoint 
lastElement, AggregatableDataPoint curElement) throws Exception {
                lastElement.aggregate(curElement.getValue());
                return lastElement; // always return first element
            }
        }, new WindowFunction<AggregatableDataPoint, AggregatableDataPoint, 
Tuple2<Long, String>, TimeWindow>() {
            @Override
            public void apply(Tuple2<Long, String> key, TimeWindow window,  
Iterable<AggregatableDataPoint> values, Collector<AggregatableDataPoint> out) 
throws Exception {
                out.collect(values.iterator().next()); // only one element
            }
        });
env.execute();


public class AggregatableDataPoint implements DataPoint {
    private String name;
    private long timestamp;
    private Aggregator aggregator; // sum, avg ...

    public AggregatableDataPoint(String name, long timestamp,
                                 Aggregator aggregator) {
        this.name = name;
        this.timestamp = timestamp;
        this.aggregator = aggregator;
    }

    public void aggregate(double value) {
        this.aggregator.addValue(value);
    }
}

public interface DataPoint extends Serializable {
    String getName();
    long getTimestamp();
    double getValue();
}

public interface Aggregator extends Serializable {
    void addValue(double value);
    double getValue();
}

public class Aggregators {
    public static Aggregator get(final String name) {
        switch (name) {
            case "sum":
                return new Sum();
            default:
                throw new IllegalArgumentException("No such aggregator: " + 
name);
        }
    }

    public static class Sum implements Aggregator {
        private double sum = 0;
        @Override
        public void addValue(double value) {
            sum += value;
        }
        @Override
        public double getValue() {
            return sum;
        }
    }
}

thank you

> deserializeObject exception on WindowOperator
> ---------------------------------------------
>
>                 Key: FLINK-3253
>                 URL: https://issues.apache.org/jira/browse/FLINK-3253
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 0.10.0
>         Environment: flink on yarn, hadoop 2.6
> state backend: hdfs
> jobmanager: high-availability
>            Reporter: godfrey he
>
> It's seem that the data in the WindowOperator can serialize successfully. 
> however deserialize failed when the taskmanager restarted by jobmanager. 
> Root exception
> java.lang.Exception: Could not restore checkpointed state to operators and 
> functions
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:744)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2508)
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2543)
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2615)
>       at java.io.DataInputStream.readInt(DataInputStream.java:387)
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2820)
>       at java.io.ObjectInputStream.readInt(ObjectInputStream.java:971)
>       at java.util.HashMap.readObject(HashMap.java:1158)
>       at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to