[ 
https://issues.apache.org/jira/browse/FLINK-8992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16456202#comment-16456202
 ] 

ASF GitHub Bot commented on FLINK-8992:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5925#discussion_r184647536
  
    --- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.ConfigOptions;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.streaming.api.CheckpointingMode;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import 
org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
    +import 
org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
    +import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ArtificialValueStateBuilder;
    +import 
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.UUID;
    +
    +/**
    + * A general purpose test for Flink's DataStream API operators and 
primitives.
    + *
    + * <p>It currrently covers the following aspects that are frequently 
present in Flink DataStream jobs:
    + * <ul>
    + *     <li>A generic Kryo input type.</li>
    + *     <li>A state type for which we register a {@link 
KryoSerializer}.</li>
    + *     <li>Operators with {@link ValueState}.</li>
    + * </ul>
    + *
    + * <p>The job allows to be configured for different state backends, 
including memory, file, and RocksDB
    + * state backends. It also allows specifying the processing guarantee 
semantics, which will also be verified
    + * by the job itself according to the specified semantic.
    + *
    + * <p>Program parameters:
    + * <ul>
    + *     <li>test.semantics (String, default - 'exactly-once'): This 
configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
    + *     <li>environment.checkpoint_interval (long, default - 1000): the 
checkpoint interval.</li>
    + *     <li>environment.parallelism (int, default - 1): parallelism to use 
for the job.</li>
    + *     <li>environment.max_parallelism (int, default - 128): max 
parallelism to use for the job</li>
    + *     <li>environment.restart_strategy.delay (long, default - 0): delay 
between restart attempts, in milliseconds.</li>
    + *     <li>state_backend (String, default - 'file'): Supported values are 
'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
    + *     <li>state_backend.checkpoint_directory (String): The checkpoint 
directory.</li>
    + *     <li>state_backend.rocks.incremental (boolean, default - false): 
Activate or deactivate incremental snapshots if RocksDBStateBackend is 
selected.</li>
    + *     <li>state_backend.file.async (boolean, default - true): Activate or 
deactivate asynchronous snapshots if FileStateBackend is selected.</li>
    + *     <li>sequence_generator_source.keyspace (int, default - 1000): 
Number of different keys for events emitted by the sequence generator.</li>
    + *     <li>sequence_generator_source.payload_size (int, default - 20): 
Length of message payloads emitted by the sequence generator.</li>
    + *     <li>sequence_generator_source.sleep_time (long, default - 0): 
Milliseconds to sleep after emitting events in the sequence generator. Set to 0 
to disable sleeping.</li>
    + *     <li>sequence_generator_source.sleep_after_elements (long, default - 
0): Number of elements to emit before sleeping in the sequence generator. Set 
to 0 to disable sleeping.</li>
    + *     <li>sequence_generator_source.event_time.max_out_of_order (long, 
default - 500): Max event time out-of-orderness for events emitted by the 
sequence generator.</li>
    + *     <li>sequence_generator_source.event_time.clock_progress (long, 
default - 100): The amount of event time to progress per event generated by the 
sequence generator.</li>
    + * </ul>
    + */
    +public class DataStreamAllroundTestProgram {
    +
    +   private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
    +           .key("test.semantics")
    +           .defaultValue("exactly-once")
    +           .withDescription("This configures the semantics to test. Can be 
'exactly-once' or 'at-least-once'");
    +
    +   private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL 
= ConfigOptions
    +           .key("environment.checkpoint_interval")
    +           .defaultValue(1000L);
    +
    +   private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = 
ConfigOptions
    +           .key("environment.parallelism")
    +           .defaultValue(1);
    +
    +   private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM 
= ConfigOptions
    +           .key("environment.max_parallelism")
    +           .defaultValue(128);
    +
    +   private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = 
ConfigOptions
    +           .key("environment.restart_strategy.delay")
    +           .defaultValue(0);
    +
    +   private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
    +           .key("state_backend")
    +           .defaultValue("file")
    +           .withDescription("Supported values are 'file' for 
FsStateBackend and 'rocks' for RocksDBStateBackend.");
    +
    +   private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR 
= ConfigOptions
    +           .key("state_backend.checkpoint_directory")
    +           .noDefaultValue()
    +           .withDescription("The checkpoint directory.");
    +
    +   private static final ConfigOption<Boolean> 
STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
    +           .key("state_backend.rocks.incremental")
    +           .defaultValue(false)
    +           .withDescription("Activate or deactivate incremental snapshots 
if RocksDBStateBackend is selected.");
    +
    +   private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = 
ConfigOptions
    +           .key("state_backend.file.async")
    +           .defaultValue(true)
    +           .withDescription("Activate or deactivate asynchronous snapshots 
if FileStateBackend is selected.");
    +
    +   private static final ConfigOption<Integer> 
SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
    +           .key("sequence_generator_source.keyspace")
    +           .defaultValue(1000);
    +
    +   private static final ConfigOption<Integer> 
SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
    +           .key("sequence_generator_source.payload_size")
    +           .defaultValue(20);
    +
    +   private static final ConfigOption<Long> 
SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
    +           .key("sequence_generator_source.sleep_time")
    +           .defaultValue(0L);
    +
    +   private static final ConfigOption<Long> 
SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
    +           .key("sequence_generator_source.sleep_after_elements")
    +           .defaultValue(0L);
    +
    +   private static final ConfigOption<Long> 
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
    +           .key("sequence_generator_source.event_time.max_out_of_order")
    +           .defaultValue(500L);
    +
    +   private static final ConfigOption<Long> 
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
    +           .key("sequence_generator_source.event_time.clock_progress")
    +           .defaultValue(100L);
    +
    +   // 
-----------------------------------------------------------------------------------------------------------------
    +
    +   public static void main(String[] args) throws Exception {
    +           final ParameterTool pt = ParameterTool.fromArgs(args);
    +
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           setupEnvironment(env, pt);
    +
    +           env.addSource(createEventSource(pt))
    +                   
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    +                   .keyBy(Event::getKey)
    +                   .map(createArtificialKeyedStateMapper(
    +                           // map function simply forwards the inputs
    +                           (MapFunction<Event, Event>) in -> in,
    +                           // state is updated per event as a wrapped 
ComplexPayload state object
    +                           (Event first, ComplexPayload second) -> new 
ComplexPayload(first), //
    +                           Arrays.asList(
    +                                   new 
KryoSerializer<>(ComplexPayload.class, env.getConfig()))
    +                           )
    +                   )
    +                   .name("ArtificalKeyedStateMapper")
    +                   .returns(Event.class)
    +                   .keyBy(Event::getKey)
    +                   .flatMap(createSemanticsCheckMapper(pt))
    +                   .name("SemanticsCheckMapper")
    +                   .addSink(new PrintSinkFunction<>());
    +
    +           env.execute("General purpose test job");
    +   }
    +
    +   public static void setupEnvironment(StreamExecutionEnvironment env, 
ParameterTool pt) throws Exception {
    +
    +           // set checkpointing semantics
    +           String semantics = pt.get(TEST_SEMANTICS.key(), 
TEST_SEMANTICS.defaultValue());
    +           long checkpointInterval = 
pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), 
ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
    +           CheckpointingMode checkpointingMode = 
semantics.equalsIgnoreCase("exactly-once")
    --- End diff --
    
    Maybe we could already catch any random string that does not represent a 
checkpointing semantic and throw an `IllegalArgumentException`?


> Implement source and operator that validate exactly-once
> --------------------------------------------------------
>
>                 Key: FLINK-8992
>                 URL: https://issues.apache.org/jira/browse/FLINK-8992
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.5.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>
> We can buildĀ this with sources that emit sequences per key and a stateful 
> (keyed) operator that validate for the update of each key that the new value 
> is the old value + 1. This can help to easily detect if events/state were 
> lost or duplicates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to