[
https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377276#comment-17377276
]
Liebing Yu commented on FLINK-16556:
------------------------------------
I've implement the checkpointing of CarSource. If you think it is alright,
could you assign this to me? I will make a pull request.
{code:java}
//代码占位符
private static class CarSource
implements SourceFunction<Tuple4<Integer, Integer, Double, Long>>,
CheckpointedFunction {
private static final long serialVersionUID = 1L;
private Integer[] speeds;
private Double[] distances;
private transient ListState<Integer> speedsState;
private transient ListState<Double> distancesState;
private Random rand = new Random();
private volatile boolean isRunning = true;
private CarSource(int numOfCars) {
speeds = new Integer[numOfCars];
distances = new Double[numOfCars];
Arrays.fill(speeds, 50);
Arrays.fill(distances, 0d);
}
public static CarSource create(int cars) {
return new CarSource(cars);
}
@Override
public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
throws Exception {
while (isRunning) {
Thread.sleep(100);
for (int carId = 0; carId < speeds.length; carId++) {
if (rand.nextBoolean()) {
speeds[carId] = Math.min(100, speeds[carId] + 5);
} else {
speeds[carId] = Math.max(0, speeds[carId] - 5);
}
distances[carId] += speeds[carId] / 3.6d;
Tuple4<Integer, Integer, Double, Long> record =
new Tuple4<>(
carId,
speeds[carId],
distances[carId],
System.currentTimeMillis());
ctx.collect(record);
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception
{
speedsState.clear();
distancesState.clear();
for (int i = 0; i < speeds.length; i++) {
speedsState.add(speeds[i]);
distancesState.add(distances[i]);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
ListStateDescriptor<Integer> speedsStateDescriptor =
new ListStateDescriptor<>(
"speeds",
TypeInformation.of(new TypeHint<Integer>() {}));
ListStateDescriptor<Double> distanceStateDescriptor =
new ListStateDescriptor<>(
"distances",
TypeInformation.of(new TypeHint<Double>() {}));
speedsState =
context.getOperatorStateStore().getListState(speedsStateDescriptor);
distancesState =
context.getOperatorStateStore().getListState(distanceStateDescriptor);
if (context.isRestored()) {
int i = 0;
for (Integer speed : speedsState.get()) {
speeds[i++] = speed;
}
i = 0;
for (Double distance : distancesState.get()) {
distances[i++] = distance;
}
}
}
}
{code}
> TopSpeedWindowing should implement checkpointing for its source
> ---------------------------------------------------------------
>
> Key: FLINK-16556
> URL: https://issues.apache.org/jira/browse/FLINK-16556
> Project: Flink
> Issue Type: Bug
> Components: Examples
> Affects Versions: 1.10.0
> Reporter: Nico Kruber
> Priority: Minor
> Labels: auto-deprioritized-major, starter
>
> {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}}
> does not implement checkpointing of its state, namely the current speeds and
> distances per car. The main problem with this is that the window trigger only
> fires if the new distance has increased by at least 50 but after restore, it
> will be reset to 0 and could thus not produce output for a while.
>
> Either the distance calculation could use {{Math.abs}} or the source needs
> proper checkpointing. Optionally with allowing the number of cars to
> increase/decrease.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)