This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 41acc3b90bbf43e6879f2e3d9cdded0cac980524
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Thu Sep 19 20:08:58 2019 +0200

    [FLINK-14160] Add --backpressure option to the ClickEventCount job in the 
operations playground
    
    This closes #4.
---
 .../java/flink-playground-clickcountjob/pom.xml    |  2 +-
 .../ops/clickcount/ClickEventCount.java            | 25 ++++++++++--
 .../ops/clickcount/functions/BackpressureMap.java  | 46 ++++++++++++++++++++++
 operations-playground/docker-compose.yaml          |  4 +-
 4 files changed, 71 insertions(+), 6 deletions(-)

diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index 3d17fcd..893c11e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@ under the License.
 
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-playground-clickcountjob</artifactId>
-       <version>1-FLINK-1.9_2.11</version>
+       <version>2-FLINK-1.9_2.11</version>
 
        <name>flink-playground-clickcountjob</name>
        <packaging>jar</packaging>
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 0316bc6..f3d628c 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.playgrounds.ops.clickcount;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import 
org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
 import 
org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
@@ -25,6 +26,7 @@ import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa
 import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
 import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
  * <p>The Job can be configured via the command line:</p>
  * * "--checkpointing": enables checkpointing
  * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--backpressure": insert an operator that causes periodic backpressure
  * * "--input-topic": the name of the Kafka Topic to consume {@link 
ClickEvent}s from
  * * "--output-topic": the name of the Kafka Topic to produce {@link 
ClickEventStatistics} to
  * * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@ public class ClickEventCount {
 
        public static final String CHECKPOINTING_OPTION = "checkpointing";
        public static final String EVENT_TIME_OPTION = "event-time";
+       public static final String BACKPRESSURE_OPTION = "backpressure";
 
        public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
 
@@ -66,6 +70,8 @@ public class ClickEventCount {
 
                configureEnvironment(params, env);
 
+               boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
+
                String inputTopic = params.get("input-topic", "input");
                String outputTopic = params.get("output-topic", "output");
                String brokers = params.get("bootstrap.servers", 
"localhost:9092");
@@ -73,19 +79,32 @@ public class ClickEventCount {
                kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokers);
                kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"click-event-count");
 
-               env.addSource(new FlinkKafkaConsumer<>(inputTopic, new 
ClickEventDeserializationSchema(), kafkaProps))
+               DataStream<ClickEvent> clicks =
+                               env.addSource(new 
FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), 
kafkaProps))
                        .name("ClickEvent Source")
                        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, 
TimeUnit.MILLISECONDS)) {
                                @Override
                                public long extractTimestamp(final ClickEvent 
element) {
                                        return element.getTimestamp().getTime();
                                }
-                       })
+                       });
+
+               if (inflictBackpressure) {
+                       // Force a network shuffle so that the backpressure 
will affect the buffer pools
+                       clicks = clicks
+                               .keyBy(ClickEvent::getPage)
+                               .map(new BackpressureMap())
+                               .name("Backpressure");
+               }
+
+               DataStream<ClickEventStatistics> statistics = clicks
                        .keyBy(ClickEvent::getPage)
                        .timeWindow(WINDOW_SIZE)
                        .aggregate(new CountingAggregator(),
                                new ClickEventStatisticsCollector())
-                       .name("ClickEvent Counter")
+                       .name("ClickEvent Counter");
+
+               statistics
                        .addSink(new FlinkKafkaProducer<>(
                                outputTopic,
                                new 
ClickEventStatisticsSerializationSchema(outputTopic),
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
new file mode 100644
index 0000000..ee68573
--- /dev/null
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+
+import java.time.LocalTime;
+
+/**
+ * This MapFunction causes severe backpressure during even-numbered minutes.
+ * E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec,
+ * but from 10:13:00 to 10:13:59 events will pass through unimpeded.
+ */
+public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> {
+
+       private boolean causeBackpressure() {
+               return ((LocalTime.now().getMinute() % 2) == 0);
+       }
+
+       @Override
+       public ClickEvent map(ClickEvent event) throws Exception {
+               if (causeBackpressure()) {
+                       Thread.sleep(100);
+               }
+
+               return event;
+       }
+
+}
diff --git a/operations-playground/docker-compose.yaml 
b/operations-playground/docker-compose.yaml
index 9ed71c5..7907092 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@ version: "2.1"
 services:
   client:
     build: ../docker/ops-playground-image
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers 
kafka:9092 --checkpointing --event-time"
     depends_on:
       - jobmanager
@@ -30,7 +30,7 @@ services:
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   clickevent-generator:
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* 
org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator 
--bootstrap.servers kafka:9092 --topic input"
     depends_on:
       - kafka

Reply via email to