[
https://issues.apache.org/jira/browse/FLINK-8992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16456202#comment-16456202
]
ASF GitHub Bot commented on FLINK-8992:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5925#discussion_r184647536
--- Diff:
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import
org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
+import
org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
+import
org.apache.flink.streaming.tests.artificialstate.eventpayload.ArtificialValueStateBuilder;
+import
org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A general purpose test for Flink's DataStream API operators and
primitives.
+ *
+ * <p>It currrently covers the following aspects that are frequently
present in Flink DataStream jobs:
+ * <ul>
+ * <li>A generic Kryo input type.</li>
+ * <li>A state type for which we register a {@link
KryoSerializer}.</li>
+ * <li>Operators with {@link ValueState}.</li>
+ * </ul>
+ *
+ * <p>The job allows to be configured for different state backends,
including memory, file, and RocksDB
+ * state backends. It also allows specifying the processing guarantee
semantics, which will also be verified
+ * by the job itself according to the specified semantic.
+ *
+ * <p>Program parameters:
+ * <ul>
+ * <li>test.semantics (String, default - 'exactly-once'): This
configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
+ * <li>environment.checkpoint_interval (long, default - 1000): the
checkpoint interval.</li>
+ * <li>environment.parallelism (int, default - 1): parallelism to use
for the job.</li>
+ * <li>environment.max_parallelism (int, default - 128): max
parallelism to use for the job</li>
+ * <li>environment.restart_strategy.delay (long, default - 0): delay
between restart attempts, in milliseconds.</li>
+ * <li>state_backend (String, default - 'file'): Supported values are
'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
+ * <li>state_backend.checkpoint_directory (String): The checkpoint
directory.</li>
+ * <li>state_backend.rocks.incremental (boolean, default - false):
Activate or deactivate incremental snapshots if RocksDBStateBackend is
selected.</li>
+ * <li>state_backend.file.async (boolean, default - true): Activate or
deactivate asynchronous snapshots if FileStateBackend is selected.</li>
+ * <li>sequence_generator_source.keyspace (int, default - 1000):
Number of different keys for events emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.payload_size (int, default - 20):
Length of message payloads emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.sleep_time (long, default - 0):
Milliseconds to sleep after emitting events in the sequence generator. Set to 0
to disable sleeping.</li>
+ * <li>sequence_generator_source.sleep_after_elements (long, default -
0): Number of elements to emit before sleeping in the sequence generator. Set
to 0 to disable sleeping.</li>
+ * <li>sequence_generator_source.event_time.max_out_of_order (long,
default - 500): Max event time out-of-orderness for events emitted by the
sequence generator.</li>
+ * <li>sequence_generator_source.event_time.clock_progress (long,
default - 100): The amount of event time to progress per event generated by the
sequence generator.</li>
+ * </ul>
+ */
+public class DataStreamAllroundTestProgram {
+
+ private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
+ .key("test.semantics")
+ .defaultValue("exactly-once")
+ .withDescription("This configures the semantics to test. Can be
'exactly-once' or 'at-least-once'");
+
+ private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL
= ConfigOptions
+ .key("environment.checkpoint_interval")
+ .defaultValue(1000L);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM =
ConfigOptions
+ .key("environment.parallelism")
+ .defaultValue(1);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM
= ConfigOptions
+ .key("environment.max_parallelism")
+ .defaultValue(128);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY =
ConfigOptions
+ .key("environment.restart_strategy.delay")
+ .defaultValue(0);
+
+ private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+ .key("state_backend")
+ .defaultValue("file")
+ .withDescription("Supported values are 'file' for
FsStateBackend and 'rocks' for RocksDBStateBackend.");
+
+ private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR
= ConfigOptions
+ .key("state_backend.checkpoint_directory")
+ .noDefaultValue()
+ .withDescription("The checkpoint directory.");
+
+ private static final ConfigOption<Boolean>
STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
+ .key("state_backend.rocks.incremental")
+ .defaultValue(false)
+ .withDescription("Activate or deactivate incremental snapshots
if RocksDBStateBackend is selected.");
+
+ private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC =
ConfigOptions
+ .key("state_backend.file.async")
+ .defaultValue(true)
+ .withDescription("Activate or deactivate asynchronous snapshots
if FileStateBackend is selected.");
+
+ private static final ConfigOption<Integer>
SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+ .key("sequence_generator_source.keyspace")
+ .defaultValue(1000);
+
+ private static final ConfigOption<Integer>
SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
+ .key("sequence_generator_source.payload_size")
+ .defaultValue(20);
+
+ private static final ConfigOption<Long>
SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+ .key("sequence_generator_source.sleep_time")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long>
SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+ .key("sequence_generator_source.sleep_after_elements")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long>
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
+ .key("sequence_generator_source.event_time.max_out_of_order")
+ .defaultValue(500L);
+
+ private static final ConfigOption<Long>
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
+ .key("sequence_generator_source.event_time.clock_progress")
+ .defaultValue(100L);
+
+ //
-----------------------------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool pt = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ setupEnvironment(env, pt);
+
+ env.addSource(createEventSource(pt))
+
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+ .keyBy(Event::getKey)
+ .map(createArtificialKeyedStateMapper(
+ // map function simply forwards the inputs
+ (MapFunction<Event, Event>) in -> in,
+ // state is updated per event as a wrapped
ComplexPayload state object
+ (Event first, ComplexPayload second) -> new
ComplexPayload(first), //
+ Arrays.asList(
+ new
KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+ )
+ )
+ .name("ArtificalKeyedStateMapper")
+ .returns(Event.class)
+ .keyBy(Event::getKey)
+ .flatMap(createSemanticsCheckMapper(pt))
+ .name("SemanticsCheckMapper")
+ .addSink(new PrintSinkFunction<>());
+
+ env.execute("General purpose test job");
+ }
+
+ public static void setupEnvironment(StreamExecutionEnvironment env,
ParameterTool pt) throws Exception {
+
+ // set checkpointing semantics
+ String semantics = pt.get(TEST_SEMANTICS.key(),
TEST_SEMANTICS.defaultValue());
+ long checkpointInterval =
pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(),
ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
+ CheckpointingMode checkpointingMode =
semantics.equalsIgnoreCase("exactly-once")
--- End diff --
Maybe we could already catch any random string that does not represent a
checkpointing semantic and throw an `IllegalArgumentException`?
> Implement source and operator that validate exactly-once
> --------------------------------------------------------
>
> Key: FLINK-8992
> URL: https://issues.apache.org/jira/browse/FLINK-8992
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Affects Versions: 1.5.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
>
> We can buildĀ this with sources that emit sequences per key and a stateful
> (keyed) operator that validate for the update of each key that the new value
> is the old value + 1. This can help to easily detect if events/state were
> lost or duplicates.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)