[ 
https://issues.apache.org/jira/browse/FLINK-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377276#comment-17377276
 ] 

Liebing Yu commented on FLINK-16556:
------------------------------------

I've implement the checkpointing of CarSource. If you think it is alright, 
could you assign this to me? I will make a pull request.
{code:java}
//代码占位符
private static class CarSource
        implements SourceFunction<Tuple4<Integer, Integer, Double, Long>>,
                   CheckpointedFunction {

    private static final long serialVersionUID = 1L;
    private Integer[] speeds;
    private Double[] distances;

    private transient ListState<Integer> speedsState;
    private transient ListState<Double> distancesState;

    private Random rand = new Random();

    private volatile boolean isRunning = true;

    private CarSource(int numOfCars) {
        speeds = new Integer[numOfCars];
        distances = new Double[numOfCars];
        Arrays.fill(speeds, 50);
        Arrays.fill(distances, 0d);
    }

    public static CarSource create(int cars) {
        return new CarSource(cars);
    }

    @Override
    public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
            throws Exception {

        while (isRunning) {
            Thread.sleep(100);
            for (int carId = 0; carId < speeds.length; carId++) {
                if (rand.nextBoolean()) {
                    speeds[carId] = Math.min(100, speeds[carId] + 5);
                } else {
                    speeds[carId] = Math.max(0, speeds[carId] - 5);
                }
                distances[carId] += speeds[carId] / 3.6d;
                Tuple4<Integer, Integer, Double, Long> record =
                        new Tuple4<>(
                                carId,
                                speeds[carId],
                                distances[carId],
                                System.currentTimeMillis());
                ctx.collect(record);
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception 
{
        speedsState.clear();
        distancesState.clear();
        for (int i = 0; i < speeds.length; i++) {
            speedsState.add(speeds[i]);
            distancesState.add(distances[i]);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws 
Exception {
        ListStateDescriptor<Integer> speedsStateDescriptor =
                new ListStateDescriptor<>(
                        "speeds",
                        TypeInformation.of(new TypeHint<Integer>() {}));
        ListStateDescriptor<Double> distanceStateDescriptor =
                new ListStateDescriptor<>(
                        "distances",
                        TypeInformation.of(new TypeHint<Double>() {}));

        speedsState = 
context.getOperatorStateStore().getListState(speedsStateDescriptor);
        distancesState = 
context.getOperatorStateStore().getListState(distanceStateDescriptor);

        if (context.isRestored()) {
            int i = 0;
            for (Integer speed : speedsState.get()) {
                speeds[i++] = speed;
            }
            i = 0;
            for (Double distance : distancesState.get()) {
                distances[i++] = distance;
            }
        }
    }
}
{code}

> TopSpeedWindowing should implement checkpointing for its source
> ---------------------------------------------------------------
>
>                 Key: FLINK-16556
>                 URL: https://issues.apache.org/jira/browse/FLINK-16556
>             Project: Flink
>          Issue Type: Bug
>          Components: Examples
>    Affects Versions: 1.10.0
>            Reporter: Nico Kruber
>            Priority: Minor
>              Labels: auto-deprioritized-major, starter
>
> {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}}
>  does not implement checkpointing of its state, namely the current speeds and 
> distances per car. The main problem with this is that the window trigger only 
> fires if the new distance has increased by at least 50 but after restore, it 
> will be reset to 0 and could thus not produce output for a while.
>  
> Either the distance calculation could use {{Math.abs}} or the source needs 
> proper checkpointing. Optionally with allowing the number of cars to 
> increase/decrease.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to