sjwiesman commented on a change in pull request #17802:
URL: https://github.com/apache/flink/pull/17802#discussion_r750399233
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
##########
@@ -19,53 +19,69 @@
package org.apache.flink.streaming.scala.examples.async
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
-import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.async.{ResultFuture,
RichAsyncFunction}
import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.async.ResultFuture
+import org.apache.flink.streaming.examples.async.util.SimpleSource
-import scala.concurrent.{ExecutionContext, Future}
+import java.util.concurrent.TimeUnit
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success}
object AsyncIOExample {
- def main(args: Array[String]) {
- val timeout = 10000L
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
+ /** An example of a [[RichAsyncFunction]] using an async client to query an
external service. */
+ class SampleAsyncFunction extends RichAsyncFunction[Int, String] {
+ private var client: AsyncClient = _
- val input = env.addSource(new SimpleSource())
+ override def open(parameters: Configuration): Unit = {
+ client = new AsyncClient
+ }
- val asyncMapped = AsyncDataStream.orderedWait(input, timeout,
TimeUnit.MILLISECONDS, 10) {
- (input, collector: ResultFuture[Int]) =>
- Future {
- collector.complete(Seq(input))
- } (ExecutionContext.global)
+ override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]):
Unit = {
+ client.query(input).onComplete {
+ case Success(value) => resultFuture.complete(Seq(value))
+ case Failure(exception) =>
resultFuture.completeExceptionally(exception)
+ }
}
+ }
- asyncMapped.print()
+ def main(args: Array[String]): Unit = {
+ val params = ParameterTool.fromArgs(args)
- env.execute("Async I/O job")
- }
-}
+ var mode: String = null
+ var timeout = 0L
-class SimpleSource extends ParallelSourceFunction[Int] {
- var running = true
- var counter = 0
+ try {
+ mode = params.get("waitMode", "ordered")
+ timeout = params.getLong("timeout", 10000L)
+ } catch {
+ case e: Exception =>
+ println("To customize example, use: AsyncIOExample [--waitMode
<ordered or unordered>]")
+ throw e
+ }
- override def run(ctx: SourceContext[Int]): Unit = {
- while (running) {
- ctx.getCheckpointLock.synchronized {
- ctx.collect(counter)
- }
- counter += 1
+ // obtain execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
- Thread.sleep(10L)
+ // create input stream of a single integer
+ val inputStream = env.addSource(new SimpleSource).map(_.toInt)
+
+ val function = new SampleAsyncFunction
+
+ // add async operator to streaming job
+ val result = mode.toUpperCase match {
+ case "ORDERED" =>
+ AsyncDataStream.orderedWait(inputStream, function, timeout,
TimeUnit.MILLISECONDS, 20)
+ case "UNORDERED" =>
+ AsyncDataStream.unorderedWait(inputStream, function, timeout,
TimeUnit.MILLISECONDS, 20)
+ case _ => throw new IllegalStateException("Unknown mode: " + mode)
}
- }
- override def cancel(): Unit = {
- running = false
+ result.print()
+
+ // execute the program
+ env.execute("Async IO Example")
Review comment:
Great idea!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]