[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472275#comment-16472275 ]
ASF GitHub Bot commented on FLINK-8982: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187674099 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.util.Collector; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Streaming application that creates an {@link Email} pojo with random ids and increasing + * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction}, + * where it is exposed as queryable state. + */ +public class QsStateProducer { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + ParameterTool tool = ParameterTool.fromArgs(args); + String tmpPath = tool.getRequired("tmp-dir"); + String stateBackendType = tool.getRequired("state-backend"); + + StateBackend stateBackend; + switch (stateBackendType) { + case "rocksdb": + stateBackend = new RocksDBStateBackend(tmpPath); + break; + case "fs": + stateBackend = new FsStateBackend(tmpPath); + break; + case "memory": + stateBackend = new MemoryStateBackend(); + break; + default: + throw new RuntimeException("Unsupported state backend " + stateBackendType); + } + + env.setStateBackend(stateBackend); + env.enableCheckpointing(1000L); + env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); + + env.addSource(new EmailSource()) + .keyBy(new KeySelector<Email, String>() { + + private static final long serialVersionUID = -1480525724620425363L; + + @Override + public String getKey(Email value) throws Exception { + return ""; + } + }) + .flatMap(new MyFlatMap()); + + env.execute(); + } + + private static class EmailSource extends RichSourceFunction<Email> { + + private static final long serialVersionUID = -7286937645300388040L; + + private Random random; + private volatile boolean isRunning = true; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.random = new Random(); + } + + @Override + public void run(SourceContext<Email> ctx) throws Exception { + // Sleep for 10 seconds on start to allow time to copy jobid + for (int i = 0; i < 100 && isRunning; i++) { + Thread.sleep(100L); + } + + int types = LabelSurrogate.Type.values().length; + + while (isRunning) { + int r = random.nextInt(100); + + final EmailId emailId = new EmailId(Integer.toString(random.nextInt())); + final Instant timestamp = Instant.now().minus(Duration.ofDays(1L)); + final String foo = String.format("foo #%d", r); + final LabelSurrogate label = new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar"); + --- End diff -- Have to take the `checkpointLock` before emitting: ``` synchronized (ctx.getCheckpointLock()) { ctx.collect(new Email(emailId, timestamp, foo, label)); } ``` > End-to-end test: Queryable state > -------------------------------- > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Florian Schmidt > Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)