[
https://issues.apache.org/jira/browse/FLINK-3253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110687#comment-15110687
]
godfrey he edited comment on FLINK-3253 at 1/22/16 1:33 AM:
------------------------------------------------------------
minimal example: read data(like: 'name' 'timestamp' 'value') from kafka, and
assign event timestamp, aggregate the values of the same 'name' each one minute.
It's seem that the exception was threw when the task manager was restarted.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
env.addSource(new FlinkKafkaConsumer082("topic", new SimpleStringSchema(),
properties))
.flatMap(new FlatMapFunction<String, AggregatableDataPoint>() {
@Override
public void flatMap(String str, Collector<AggregatableDataPoint>
out) {
String[] words = str.split(" ");
String name = words[0];
long timestamp = Long.valueOf(words[1]);
double value = Double.valueOf(words[2]);
Aggregator aggregator = Aggregators.get("sum");
aggregator.addValue(value);
out.collect(new AggregatableDataPoint(name, timestamp,
aggregator));
}
})
.assignTimestamps(new TimestampExtractor<AggregatableDataPoint>() {
@Override
public long extractTimestamp(AggregatableDataPoint element, long
currentTimestamp) {
return element.getTimestamp();
}
@Override
public long extractWatermark(AggregatableDataPoint element, long
currentTimestamp) {
return Long.MIN_VALUE;
}
@Override
public long getCurrentWatermark() {
return System.currentTimeMillis() - 10000; // delay 10s
}
})
.keyBy(new KeySelector<AggregatableDataPoint, Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> getKey(AggregatableDataPoint dp) throws
Exception {
Long time = dp.getTimestamp() / 60000; // aggregate data in 1
minute to 1 dp
return new Tuple2<>(time, dp.getName());
}
})
.timeWindow(Time.of(60000, TimeUnit.MILLISECONDS))
.apply(new ReduceFunction<AggregatableDataPoint>() {
@Override
public AggregatableDataPoint reduce(AggregatableDataPoint
lastElement, AggregatableDataPoint curElement) throws Exception {
lastElement.aggregate(curElement.getValue());
return lastElement; // always return first element
}
}, new WindowFunction<AggregatableDataPoint, AggregatableDataPoint,
Tuple2<Long, String>, TimeWindow>() {
@Override
public void apply(Tuple2<Long, String> key, TimeWindow window,
Iterable<AggregatableDataPoint> values, Collector<AggregatableDataPoint> out)
throws Exception {
out.collect(values.iterator().next()); // only one element
}
});
env.execute();
public class AggregatableDataPoint implements DataPoint {
private String name;
private long timestamp;
private Aggregator aggregator; // sum, avg ...
public AggregatableDataPoint(String name, long timestamp,
Aggregator aggregator) {
this.name = name;
this.timestamp = timestamp;
this.aggregator = aggregator;
}
public void aggregate(double value) {
this.aggregator.addValue(value);
}
}
public interface DataPoint extends Serializable {
String getName();
long getTimestamp();
double getValue();
}
public interface Aggregator extends Serializable {
void addValue(double value);
double getValue();
}
public class Aggregators {
public static Aggregator get(final String name) {
switch (name) {
case "sum":
return new Sum();
default:
throw new IllegalArgumentException("No such aggregator: " +
name);
}
}
public static class Sum implements Aggregator {
private double sum = 0;
@Override
public void addValue(double value) {
sum += value;
}
@Override
public double getValue() {
return sum;
}
}
}
thank you
was (Author: godfreyhe):
minimal example: read data(like: 'name' 'timestamp' 'value') from kafka, and
assign event timestamp, aggregate the values of the same 'name' each one minute.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
env.addSource(new FlinkKafkaConsumer082("topic", new SimpleStringSchema(),
properties))
.flatMap(new FlatMapFunction<String, AggregatableDataPoint>() {
@Override
public void flatMap(String str, Collector<AggregatableDataPoint>
out) {
String[] words = str.split(" ");
String name = words[0];
long timestamp = Long.valueOf(words[1]);
double value = Double.valueOf(words[2]);
Aggregator aggregator = Aggregators.get("sum");
aggregator.addValue(value);
out.collect(new AggregatableDataPoint(name, timestamp,
aggregator));
}
})
.assignTimestamps(new TimestampExtractor<AggregatableDataPoint>() {
@Override
public long extractTimestamp(AggregatableDataPoint element, long
currentTimestamp) {
return element.getTimestamp();
}
@Override
public long extractWatermark(AggregatableDataPoint element, long
currentTimestamp) {
return Long.MIN_VALUE;
}
@Override
public long getCurrentWatermark() {
return System.currentTimeMillis() - 10000; // delay 10s
}
})
.keyBy(new KeySelector<AggregatableDataPoint, Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> getKey(AggregatableDataPoint dp) throws
Exception {
Long time = dp.getTimestamp() / 60000; // aggregate data in 1
minute to 1 dp
return new Tuple2<>(time, dp.getName());
}
})
.timeWindow(Time.of(60000, TimeUnit.MILLISECONDS))
.apply(new ReduceFunction<AggregatableDataPoint>() {
@Override
public AggregatableDataPoint reduce(AggregatableDataPoint
lastElement, AggregatableDataPoint curElement) throws Exception {
lastElement.aggregate(curElement.getValue());
return lastElement; // always return first element
}
}, new WindowFunction<AggregatableDataPoint, AggregatableDataPoint,
Tuple2<Long, String>, TimeWindow>() {
@Override
public void apply(Tuple2<Long, String> key, TimeWindow window,
Iterable<AggregatableDataPoint> values, Collector<AggregatableDataPoint> out)
throws Exception {
out.collect(values.iterator().next()); // only one element
}
});
env.execute();
public class AggregatableDataPoint implements DataPoint {
private String name;
private long timestamp;
private Aggregator aggregator; // sum, avg ...
public AggregatableDataPoint(String name, long timestamp,
Aggregator aggregator) {
this.name = name;
this.timestamp = timestamp;
this.aggregator = aggregator;
}
public void aggregate(double value) {
this.aggregator.addValue(value);
}
}
public interface DataPoint extends Serializable {
String getName();
long getTimestamp();
double getValue();
}
public interface Aggregator extends Serializable {
void addValue(double value);
double getValue();
}
public class Aggregators {
public static Aggregator get(final String name) {
switch (name) {
case "sum":
return new Sum();
default:
throw new IllegalArgumentException("No such aggregator: " +
name);
}
}
public static class Sum implements Aggregator {
private double sum = 0;
@Override
public void addValue(double value) {
sum += value;
}
@Override
public double getValue() {
return sum;
}
}
}
thank you
> deserializeObject exception on WindowOperator
> ---------------------------------------------
>
> Key: FLINK-3253
> URL: https://issues.apache.org/jira/browse/FLINK-3253
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 0.10.0
> Environment: flink on yarn, hadoop 2.6
> state backend: hdfs
> jobmanager: high-availability
> Reporter: godfrey he
>
> It's seem that the data in the WindowOperator can serialize successfully.
> however deserialize failed when the taskmanager restarted by jobmanager.
> Root exception
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
> at
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2508)
> at
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2543)
> at
> java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2615)
> at java.io.DataInputStream.readInt(DataInputStream.java:387)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2820)
> at java.io.ObjectInputStream.readInt(ObjectInputStream.java:971)
> at java.util.HashMap.readObject(HashMap.java:1158)
> at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)