This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git
commit 41acc3b90bbf43e6879f2e3d9cdded0cac980524 Author: David Anderson <da...@alpinegizmo.com> AuthorDate: Thu Sep 19 20:08:58 2019 +0200 [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground This closes #4. --- .../java/flink-playground-clickcountjob/pom.xml | 2 +- .../ops/clickcount/ClickEventCount.java | 25 ++++++++++-- .../ops/clickcount/functions/BackpressureMap.java | 46 ++++++++++++++++++++++ operations-playground/docker-compose.yaml | 4 +- 4 files changed, 71 insertions(+), 6 deletions(-) diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml index 3d17fcd..893c11e 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml @@ -22,7 +22,7 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-playground-clickcountjob</artifactId> - <version>1-FLINK-1.9_2.11</version> + <version>2-FLINK-1.9_2.11</version> <name>flink-playground-clickcountjob</name> <packaging>jar</packaging> diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java index 0316bc6..f3d628c 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java @@ -18,6 +18,7 @@ package org.apache.flink.playgrounds.ops.clickcount; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap; import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector; import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; @@ -25,6 +26,7 @@ import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; @@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit; * <p>The Job can be configured via the command line:</p> * * "--checkpointing": enables checkpointing * * "--event-time": set the StreamTimeCharacteristic to EventTime + * * "--backpressure": insert an operator that causes periodic backpressure * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to * * "--bootstrap.servers": comma-separated list of Kafka brokers @@ -56,6 +59,7 @@ public class ClickEventCount { public static final String CHECKPOINTING_OPTION = "checkpointing"; public static final String EVENT_TIME_OPTION = "event-time"; + public static final String BACKPRESSURE_OPTION = "backpressure"; public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS); @@ -66,6 +70,8 @@ public class ClickEventCount { configureEnvironment(params, env); + boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION); + String inputTopic = params.get("input-topic", "input"); String outputTopic = params.get("output-topic", "output"); String brokers = params.get("bootstrap.servers", "localhost:9092"); @@ -73,19 +79,32 @@ public class ClickEventCount { kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count"); - env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) + DataStream<ClickEvent> clicks = + env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) .name("ClickEvent Source") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) { @Override public long extractTimestamp(final ClickEvent element) { return element.getTimestamp().getTime(); } - }) + }); + + if (inflictBackpressure) { + // Force a network shuffle so that the backpressure will affect the buffer pools + clicks = clicks + .keyBy(ClickEvent::getPage) + .map(new BackpressureMap()) + .name("Backpressure"); + } + + DataStream<ClickEventStatistics> statistics = clicks .keyBy(ClickEvent::getPage) .timeWindow(WINDOW_SIZE) .aggregate(new CountingAggregator(), new ClickEventStatisticsCollector()) - .name("ClickEvent Counter") + .name("ClickEvent Counter"); + + statistics .addSink(new FlinkKafkaProducer<>( outputTopic, new ClickEventStatisticsSerializationSchema(outputTopic), diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java new file mode 100644 index 0000000..ee68573 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; + +import java.time.LocalTime; + +/** + * This MapFunction causes severe backpressure during even-numbered minutes. + * E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec, + * but from 10:13:00 to 10:13:59 events will pass through unimpeded. + */ +public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> { + + private boolean causeBackpressure() { + return ((LocalTime.now().getMinute() % 2) == 0); + } + + @Override + public ClickEvent map(ClickEvent event) throws Exception { + if (causeBackpressure()) { + Thread.sleep(100); + } + + return event; + } + +} diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml index 9ed71c5..7907092 100644 --- a/operations-playground/docker-compose.yaml +++ b/operations-playground/docker-compose.yaml @@ -20,7 +20,7 @@ version: "2.1" services: client: build: ../docker/ops-playground-image - image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11 + image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11 command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time" depends_on: - jobmanager @@ -30,7 +30,7 @@ services: environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager clickevent-generator: - image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11 + image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11 command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input" depends_on: - kafka