[
https://issues.apache.org/jira/browse/FLINK-3253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110687#comment-15110687
]
godfrey he commented on FLINK-3253:
-----------------------------------
minimal example: read data(like: 'name' 'timestamp' 'value') from kafka, and
assign event timestamp, aggregate the values of the same 'name' each one minute.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
env.addSource(new FlinkKafkaConsumer082("topic", new SimpleStringSchema(),
properties))
.flatMap(new FlatMapFunction<String, AggregatableDataPoint>() {
@Override
public void flatMap(String str, Collector<AggregatableDataPoint>
out) {
String[] words = str.split(" ");
String name = words[0];
long timestamp = Long.valueOf(words[1]);
double value = Double.valueOf(words[2]);
Aggregator aggregator = Aggregators.get("sum");
aggregator.addValue(value);
out.collect(new AggregatableDataPoint(name, timestamp,
aggregator));
}
})
.assignTimestamps(new TimestampExtractor<AggregatableDataPoint>() {
@Override
public long extractTimestamp(AggregatableDataPoint element, long
currentTimestamp) {
return element.getTimestamp();
}
@Override
public long extractWatermark(AggregatableDataPoint element, long
currentTimestamp) {
return Long.MIN_VALUE;
}
@Override
public long getCurrentWatermark() {
return System.currentTimeMillis() - 10000; // delay 10s
}
})
.keyBy(new KeySelector<AggregatableDataPoint, Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> getKey(AggregatableDataPoint dp) throws
Exception {
Long time = dp.getTimestamp() / 60000; // aggregate data in 1
minute to 1 dp
return new Tuple2<>(time, dp.getName());
}
})
.timeWindow(Time.of(60000, TimeUnit.MILLISECONDS))
.apply(new ReduceFunction<AggregatableDataPoint>() {
@Override
public AggregatableDataPoint reduce(AggregatableDataPoint
lastElement, AggregatableDataPoint curElement) throws Exception {
lastElement.aggregate(curElement.getValue());
return lastElement; // always return first element
}
}, new WindowFunction<AggregatableDataPoint, AggregatableDataPoint,
Tuple2<Long, String>, TimeWindow>() {
@Override
public void apply(Tuple2<Long, String> key, TimeWindow window,
Iterable<AggregatableDataPoint> values, Collector<AggregatableDataPoint> out)
throws Exception {
out.collect(values.iterator().next()); // only one element
}
});
env.execute();
public class AggregatableDataPoint implements DataPoint {
private String name;
private long timestamp;
private Aggregator aggregator; // sum, avg ...
public AggregatableDataPoint(String name, long timestamp,
Aggregator aggregator) {
this.name = name;
this.timestamp = timestamp;
this.aggregator = aggregator;
}
public void aggregate(double value) {
this.aggregator.addValue(value);
}
}
public interface DataPoint extends Serializable {
String getName();
long getTimestamp();
double getValue();
}
public interface Aggregator extends Serializable {
void addValue(double value);
double getValue();
}
public class Aggregators {
public static Aggregator get(final String name) {
switch (name) {
case "sum":
return new Sum();
default:
throw new IllegalArgumentException("No such aggregator: " +
name);
}
}
public static class Sum implements Aggregator {
private double sum = 0;
@Override
public void addValue(double value) {
sum += value;
}
@Override
public double getValue() {
return sum;
}
}
}
thank you
> deserializeObject exception on WindowOperator
> ---------------------------------------------
>
> Key: FLINK-3253
> URL: https://issues.apache.org/jira/browse/FLINK-3253
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 0.10.0
> Environment: flink on yarn, hadoop 2.6
> state backend: hdfs
> jobmanager: high-availability
> Reporter: godfrey he
>
> It's seem that the data in the WindowOperator can serialize successfully.
> however deserialize failed when the taskmanager restarted by jobmanager.
> Root exception
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
> at
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2508)
> at
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2543)
> at
> java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2615)
> at java.io.DataInputStream.readInt(DataInputStream.java:387)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2820)
> at java.io.ObjectInputStream.readInt(ObjectInputStream.java:971)
> at java.util.HashMap.readObject(HashMap.java:1158)
> at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)