Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187677168 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.util.Collector; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Streaming application that creates an {@link Email} pojo with random ids and increasing + * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction}, + * where it is exposed as queryable state. + */ +public class QsStateProducer { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + ParameterTool tool = ParameterTool.fromArgs(args); + String tmpPath = tool.getRequired("tmp-dir"); + String stateBackendType = tool.getRequired("state-backend"); + + StateBackend stateBackend; + switch (stateBackendType) { + case "rocksdb": + stateBackend = new RocksDBStateBackend(tmpPath); + break; + case "fs": + stateBackend = new FsStateBackend(tmpPath); + break; + case "memory": + stateBackend = new MemoryStateBackend(); + break; + default: + throw new RuntimeException("Unsupported state backend " + stateBackendType); + } + + env.setStateBackend(stateBackend); + env.enableCheckpointing(1000L); + env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); + + env.addSource(new EmailSource()) + .keyBy(new KeySelector<Email, String>() { + + private static final long serialVersionUID = -1480525724620425363L; + + @Override + public String getKey(Email value) throws Exception { + return ""; + } + }) + .flatMap(new MyFlatMap()); + + env.execute(); + } + + private static class EmailSource extends RichSourceFunction<Email> { + + private static final long serialVersionUID = -7286937645300388040L; + + private Random random; --- End diff -- The `random` should be `transient`
---