[ https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377276#comment-17377276 ]
Liebing Yu commented on FLINK-16556: ------------------------------------ I've implement the checkpointing of CarSource. If you think it is alright, could you assign this to me? I will make a pull request. {code:java} //代码占位符 private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>>, CheckpointedFunction { private static final long serialVersionUID = 1L; private Integer[] speeds; private Double[] distances; private transient ListState<Integer> speedsState; private transient ListState<Double> distancesState; private Random rand = new Random(); private volatile boolean isRunning = true; private CarSource(int numOfCars) { speeds = new Integer[numOfCars]; distances = new Double[numOfCars]; Arrays.fill(speeds, 50); Arrays.fill(distances, 0d); } public static CarSource create(int cars) { return new CarSource(cars); } @Override public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception { while (isRunning) { Thread.sleep(100); for (int carId = 0; carId < speeds.length; carId++) { if (rand.nextBoolean()) { speeds[carId] = Math.min(100, speeds[carId] + 5); } else { speeds[carId] = Math.max(0, speeds[carId] - 5); } distances[carId] += speeds[carId] / 3.6d; Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>( carId, speeds[carId], distances[carId], System.currentTimeMillis()); ctx.collect(record); } } } @Override public void cancel() { isRunning = false; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { speedsState.clear(); distancesState.clear(); for (int i = 0; i < speeds.length; i++) { speedsState.add(speeds[i]); distancesState.add(distances[i]); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Integer> speedsStateDescriptor = new ListStateDescriptor<>( "speeds", TypeInformation.of(new TypeHint<Integer>() {})); ListStateDescriptor<Double> distanceStateDescriptor = new ListStateDescriptor<>( "distances", TypeInformation.of(new TypeHint<Double>() {})); speedsState = context.getOperatorStateStore().getListState(speedsStateDescriptor); distancesState = context.getOperatorStateStore().getListState(distanceStateDescriptor); if (context.isRestored()) { int i = 0; for (Integer speed : speedsState.get()) { speeds[i++] = speed; } i = 0; for (Double distance : distancesState.get()) { distances[i++] = distance; } } } } {code} > TopSpeedWindowing should implement checkpointing for its source > --------------------------------------------------------------- > > Key: FLINK-16556 > URL: https://issues.apache.org/jira/browse/FLINK-16556 > Project: Flink > Issue Type: Bug > Components: Examples > Affects Versions: 1.10.0 > Reporter: Nico Kruber > Priority: Minor > Labels: auto-deprioritized-major, starter > > {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}} > does not implement checkpointing of its state, namely the current speeds and > distances per car. The main problem with this is that the window trigger only > fires if the new distance has increased by at least 50 but after restore, it > will be reset to 0 and could thus not produce output for a while. > > Either the distance calculation could use {{Math.abs}} or the source needs > proper checkpointing. Optionally with allowing the number of cars to > increase/decrease. -- This message was sent by Atlassian Jira (v8.3.4#803005)