This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 25e0140ef7eb0c6ecd0b94f575d60a5c43552cb5 Author: sjwiesman <[email protected]> AuthorDate: Mon Nov 15 14:55:00 2021 -0600 [FLINK-24635][examples] Fix deprecations in async example --- .../streaming/examples/async/AsyncClient.java | 43 ++++ .../streaming/examples/async/AsyncIOExample.java | 277 +++------------------ .../examples/async/util/SimpleSource.java | 77 ++++++ .../scala/examples/async/AsyncClient.scala | 40 +++ .../scala/examples/async/AsyncIOExample.scala | 82 +++--- 5 files changed, 245 insertions(+), 274 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncClient.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncClient.java new file mode 100644 index 0000000..3ff8d3a --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncClient.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; + +/** A simple asynchronous client that simulates interacting with an unreliable external service. */ +public class AsyncClient { + + public CompletableFuture<String> query(int key) { + return CompletableFuture.supplyAsync( + () -> { + long sleep = (long) (ThreadLocalRandom.current().nextFloat() * 100); + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + throw new RuntimeException("AsyncClient was interrupted", e); + } + + if (ThreadLocalRandom.current().nextFloat() < 0.001f) { + throw new RuntimeException("wahahahaha..."); + } else { + return "key" + (key % 10); + } + }); + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java index 50c220f..fd7335e 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java @@ -17,295 +17,90 @@ package org.apache.flink.streaming.examples.async; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.streaming.examples.async.util.SimpleSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** Example to illustrates how to use {@link AsyncFunction}. */ public class AsyncIOExample { - private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class); - - private static final String EXACTLY_ONCE_MODE = "exactly_once"; - private static final String ORDERED = "ordered"; - - /** A checkpointed source. */ - private static class SimpleSource implements SourceFunction<Integer>, CheckpointedFunction { - private static final long serialVersionUID = 1L; - - private volatile boolean isRunning = true; - private int counter = 0; - private int start = 0; - - private ListState<Integer> state; - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - state = - context.getOperatorStateStore() - .getListState( - new ListStateDescriptor<>("state", IntSerializer.INSTANCE)); - - // restore any state that we might already have to our fields, initialize state - // is also called in case of restore. - for (Integer i : state.get()) { - start = i; - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - state.clear(); - state.add(start); - } - - public SimpleSource(int maxNum) { - this.counter = maxNum; - } - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - while ((start < counter || counter == -1) && isRunning) { - synchronized (ctx.getCheckpointLock()) { - ctx.collect(start); - ++start; - - // loop back to 0 - if (start == Integer.MAX_VALUE) { - start = 0; - } - } - Thread.sleep(10L); - } - } - - @Override - public void cancel() { - isRunning = false; - } - } - - /** - * An example of {@link AsyncFunction} using a thread pool and executing working threads to - * simulate multiple async operations. - * - * <p>For the real use case in production environment, the thread pool may stay in the async - * client. - */ + /** An example of {@link AsyncFunction} using an async client to query an external service. */ private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> { - private static final long serialVersionUID = 2098635244857937717L; - - private transient ExecutorService executorService; - - /** - * The result of multiplying sleepFactor with a random float is used to pause the working - * thread in the thread pool, simulating a time consuming async operation. - */ - private final long sleepFactor; - - /** - * The ratio to generate an exception to simulate an async error. For example, the error may - * be a TimeoutException while visiting HBase. - */ - private final float failRatio; - - private final long shutdownWaitTS; - - SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) { - this.sleepFactor = sleepFactor; - this.failRatio = failRatio; - this.shutdownWaitTS = shutdownWaitTS; - } + private static final long serialVersionUID = 1L; - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - executorService = Executors.newFixedThreadPool(30); - } + private transient AsyncClient client; @Override - public void close() throws Exception { - super.close(); - ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService); + public void open(Configuration parameters) { + client = new AsyncClient(); } @Override public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) { - executorService.submit( - () -> { - // wait for while to simulate async operation here - long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor); - try { - Thread.sleep(sleep); - - if (ThreadLocalRandom.current().nextFloat() < failRatio) { - resultFuture.completeExceptionally(new Exception("wahahahaha...")); - } else { - resultFuture.complete( - Collections.singletonList("key-" + (input % 10))); - } - } catch (InterruptedException e) { - resultFuture.complete(new ArrayList<>(0)); - } - }); + client.query(input) + .whenComplete( + (response, error) -> { + if (response != null) { + resultFuture.complete(Collections.singletonList(response)); + } else { + resultFuture.completeExceptionally(error); + } + }); } } - private static void printUsage() { - System.out.println( - "To customize example, use: AsyncIOExample [--fsStatePath <path to fs state>] " - + "[--checkpointMode <exactly_once or at_least_once>] " - + "[--maxCount <max number of input from source, -1 for infinite input>] " - + "[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] " - + "[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] " - + "[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]" - + "[--timeout <Timeout for the asynchronous operations>]"); - } - public static void main(String[] args) throws Exception { - - // obtain execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // parse parameters final ParameterTool params = ParameterTool.fromArgs(args); - final String statePath; - final String cpMode; - final int maxCount; - final long sleepFactor; - final float failRatio; final String mode; - final int taskNum; - final long shutdownWaitTS; final long timeout; try { - // check the configuration for the job - statePath = params.get("fsStatePath", null); - cpMode = params.get("checkpointMode", "exactly_once"); - maxCount = params.getInt("maxCount", 100000); - sleepFactor = params.getLong("sleepFactor", 100); - failRatio = params.getFloat("failRatio", 0.001f); mode = params.get("waitMode", "ordered"); - taskNum = params.getInt("waitOperatorParallelism", 1); - shutdownWaitTS = params.getLong("shutdownWaitTS", 20000); timeout = params.getLong("timeout", 10000L); } catch (Exception e) { - printUsage(); - + System.out.println( + "To customize example, use: AsyncIOExample [--waitMode <ordered or unordered>]"); throw e; } - StringBuilder configStringBuilder = new StringBuilder(); - - final String lineSeparator = System.getProperty("line.separator"); - - configStringBuilder - .append("Job configuration") - .append(lineSeparator) - .append("FS state path=") - .append(statePath) - .append(lineSeparator) - .append("Checkpoint mode=") - .append(cpMode) - .append(lineSeparator) - .append("Max count of input from source=") - .append(maxCount) - .append(lineSeparator) - .append("Sleep factor=") - .append(sleepFactor) - .append(lineSeparator) - .append("Fail ratio=") - .append(failRatio) - .append(lineSeparator) - .append("Waiting mode=") - .append(mode) - .append(lineSeparator) - .append("Parallelism for async wait operator=") - .append(taskNum) - .append(lineSeparator) - .append("Shutdown wait timestamp=") - .append(shutdownWaitTS); - - LOG.info(configStringBuilder.toString()); - - if (statePath != null) { - // setup state and checkpoint mode - env.getCheckpointConfig().setCheckpointStorage(statePath); - } - - if (EXACTLY_ONCE_MODE.equals(cpMode)) { - env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE); - } else { - env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE); - } + // obtain execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // create input stream of a single integer - DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount)); + DataStream<Integer> inputStream = env.addSource(new SimpleSource()); - // create async function, which will "wait" for a while to simulate the process of async i/o - AsyncFunction<Integer, String> function = - new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS); + AsyncFunction<Integer, String> function = new SampleAsyncFunction(); // add async operator to streaming job DataStream<String> result; - if (ORDERED.equals(mode)) { - result = - AsyncDataStream.orderedWait( - inputStream, function, timeout, TimeUnit.MILLISECONDS, 20) - .setParallelism(taskNum); - } else { - result = - AsyncDataStream.unorderedWait( - inputStream, function, timeout, TimeUnit.MILLISECONDS, 20) - .setParallelism(taskNum); + switch (mode.toUpperCase()) { + case "ORDERED": + result = + AsyncDataStream.orderedWait( + inputStream, function, timeout, TimeUnit.MILLISECONDS, 20); + break; + case "UNORDERED": + result = + AsyncDataStream.unorderedWait( + inputStream, function, timeout, TimeUnit.MILLISECONDS, 20); + break; + default: + throw new IllegalStateException("Unknown mode: " + mode); } - // add a reduce to get the sum of each keys. - result.flatMap( - new FlatMapFunction<String, Tuple2<String, Integer>>() { - private static final long serialVersionUID = -938116068682344455L; - - @Override - public void flatMap( - String value, Collector<Tuple2<String, Integer>> out) - throws Exception { - out.collect(new Tuple2<>(value, 1)); - } - }) - .keyBy(value -> value.f0) - .sum(1) - .print(); + result.print(); // execute the program - env.execute("Async IO Example"); + env.execute("Async IO Example: " + mode); } } diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/util/SimpleSource.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/util/SimpleSource.java new file mode 100644 index 0000000..760438c --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/util/SimpleSource.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async.util; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +/** A checkpointed source. */ +public class SimpleSource implements SourceFunction<Integer>, CheckpointedFunction { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int start = 0; + + private ListState<Integer> state; + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + state = + context.getOperatorStateStore() + .getListState(new ListStateDescriptor<>("state", IntSerializer.INSTANCE)); + + // restore any state that we might already have to our fields, initialize state + // is also called in case of restore. + for (Integer i : state.get()) { + start = i; + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + state.clear(); + state.add(start); + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + while (isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + + // loop back to 0 + if (start == Integer.MAX_VALUE) { + start = 0; + } + } + Thread.sleep(10L); + } + } + + @Override + public void cancel() { + isRunning = false; + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala new file mode 100644 index 0000000..3270980 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.async + +import java.util.concurrent.ThreadLocalRandom +import scala.concurrent.{ExecutionContext, Future} + +/** A simple asynchronous client that simulates interacting with an unreliable external service. */ +class AsyncClient { + + def query(key: Int)(implicit executor: ExecutionContext): Future[String] = Future { + val sleep = (ThreadLocalRandom.current.nextFloat * 100).toLong + try Thread.sleep(sleep) catch { + case e: InterruptedException => + throw new RuntimeException("AsyncClient was interrupted", e) + } + + if (ThreadLocalRandom.current.nextFloat < 0.001f) { + throw new RuntimeException("wahahahaha...") + } else { + "key" + (key % 10) + } + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala index 5808aaa..b711a35 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala @@ -19,53 +19,69 @@ package org.apache.flink.streaming.scala.examples.async -import java.util.concurrent.TimeUnit - -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction} import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.scala.async.ResultFuture +import org.apache.flink.streaming.examples.async.util.SimpleSource -import scala.concurrent.{ExecutionContext, Future} +import java.util.concurrent.TimeUnit +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success} object AsyncIOExample { - def main(args: Array[String]) { - val timeout = 10000L - - val env = StreamExecutionEnvironment.getExecutionEnvironment + /** An example of a [[RichAsyncFunction]] using an async client to query an external service. */ + class SampleAsyncFunction extends RichAsyncFunction[Int, String] { + private var client: AsyncClient = _ - val input = env.addSource(new SimpleSource()) + override def open(parameters: Configuration): Unit = { + client = new AsyncClient + } - val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) { - (input, collector: ResultFuture[Int]) => - Future { - collector.complete(Seq(input)) - } (ExecutionContext.global) + override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = { + client.query(input).onComplete { + case Success(value) => resultFuture.complete(Seq(value)) + case Failure(exception) => resultFuture.completeExceptionally(exception) + } } + } - asyncMapped.print() + def main(args: Array[String]): Unit = { + val params = ParameterTool.fromArgs(args) - env.execute("Async I/O job") - } -} + var mode: String = null + var timeout = 0L -class SimpleSource extends ParallelSourceFunction[Int] { - var running = true - var counter = 0 + try { + mode = params.get("waitMode", "ordered") + timeout = params.getLong("timeout", 10000L) + } catch { + case e: Exception => + println("To customize example, use: AsyncIOExample [--waitMode <ordered or unordered>]") + throw e + } - override def run(ctx: SourceContext[Int]): Unit = { - while (running) { - ctx.getCheckpointLock.synchronized { - ctx.collect(counter) - } - counter += 1 + // obtain execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment - Thread.sleep(10L) + // create input stream of a single integer + val inputStream = env.addSource(new SimpleSource).map(_.toInt) + + val function = new SampleAsyncFunction + + // add async operator to streaming job + val result = mode.toUpperCase match { + case "ORDERED" => + AsyncDataStream.orderedWait(inputStream, function, timeout, TimeUnit.MILLISECONDS, 20) + case "UNORDERED" => + AsyncDataStream.unorderedWait(inputStream, function, timeout, TimeUnit.MILLISECONDS, 20) + case _ => throw new IllegalStateException("Unknown mode: " + mode) } - } - override def cancel(): Unit = { - running = false + result.print() + + // execute the program + env.execute("Async IO Example: " + mode) } }
