This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 58751b7798562a86ba9bf107737013ac4cde9e01
Author: Markos Sfikas <mar...@data-artisans.com>
AuthorDate: Thu Nov 21 15:12:04 2019 +0100

    This is a blog post on how to query Pulsar streams using Apache Flink.
---
 ...1-25-query-pulsar-streams-using-apache-flink.md | 220 +++++++++++++++++++++
 ...-sql-blog-post-visual-primitive-avro-schema.png | Bin 0 -> 543150 bytes
 img/blog/flink-pulsar-sql-blog-post-visual.png     | Bin 0 -> 68506 bytes
 3 files changed, 220 insertions(+)

diff --git a/_posts/2019-11-25-query-pulsar-streams-using-apache-flink.md 
b/_posts/2019-11-25-query-pulsar-streams-using-apache-flink.md
new file mode 100644
index 0000000..e41325c
--- /dev/null
+++ b/_posts/2019-11-25-query-pulsar-streams-using-apache-flink.md
@@ -0,0 +1,220 @@
+---
+layout: post
+title: "How to query Pulsar Streams using Apache Flink"
+date:  2019-11-25 12:00:00
+categories: news
+authors:
+- sijie:
+  name: "Sijie Guo"
+  twitter: "sijieg"
+
+- marksfik:
+  name: "Markos Sfikas"
+  twitter: "MarkSfik" 
+
+excerpt: This blog post discusses the new developments and integrations 
between Apache Flink and Apache Pulsar and showcases how you can leverage 
Pulsar’s built-in schema to query Pulsar streams in real time using Apache 
Flink. 
+---
+
+In a previous [story](https://flink.apache.org/2019/05/03/pulsar-flink.html) 
on the  Flink blog, we explained the different ways that [Apache 
Flink](https://flink.apache.org/) and [Apache 
Pulsar](https://pulsar.apache.org/) can integrate to provide elastic data 
processing at large scale. This blog post discusses the new developments and 
integrations between the two frameworks and showcases how you can leverage 
Pulsar’s built-in schema to query Pulsar streams in real time using Apache 
Flink. 
+
+
+# A short intro to Apache Pulsar
+
+Apache Pulsar is a flexible pub/sub messaging system, backed by durable log 
storage. Some of the framework’s highlights include multi-tenancy, a unified 
message model, structured event streams and a cloud-native architecture that 
make it a perfect fit for a wide set of use cases, ranging from billing, 
payments and trading services all the way to the unification of the different 
messaging architectures in an organization. If you are interested in finding 
out more about Pulsar, you can vis [...]
+
+
+# Existing Pulsar & Flink integration (Apache Flink 1.6+)
+
+The existing integration between Pulsar and Flink exploits Pulsar as a message 
queue in a Flink application. Flink developers can utilize Pulsar as a 
streaming source and streaming sink for their Flink applications by selecting a 
specific Pulsar source and connecting to their desired Pulsar cluster and topic:
+
+```java
+// create and configure Pulsar consumer
+PulsarSourceBuilder<String>builder = PulsarSourceBuilder  
+  .builder(new SimpleStringSchema()) 
+  .serviceUrl(serviceUrl)
+  .topic(inputTopic)
+  .subsciptionName(subscription);
+SourceFunction<String> src = builder.build();
+// ingest DataStream with Pulsar consumer
+DataStream<String> words = env.addSource(src);
+```
+
+Pulsar streams can then get connected to the Flink processing logic…
+
+```java
+// perform computation on DataStream (here a simple WordCount)
+DataStream<WordWithCount> wc = words
+  .flatmap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {
+    collector.collect(new WordWithCount(word, 1));
+  })
+ 
+  .returns(WordWithCount.class)
+  .keyBy("word")
+  .timeWindow(Time.seconds(5))
+  .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+    new WordWithCount(c1.word, c1.count + c2.count));
+```
+
+...and then get emitted back to Pulsar (used now as a sink), sending one’s 
computation results downstream, back to a Pulsar topic: 
+
+
+```java
+// emit result via Pulsar producer 
+wc.addSink(new FlinkPulsarProducer<>(
+  serviceUrl,
+  outputTopic,
+  new AuthentificationDisabled(),
+  wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
+  wordWithCount -> wordWithCount.word)
+);
+```
+
+Although this is a great first integration step, the existing design is not 
leveraging the full power of Pulsar. Some shortcomings of the integration with 
Flink 1.6.0 relate to Pulsar neither being utilized as durable storage nor 
having schema integration with Flink, resulting in manual input when describing 
an application’s schema registry.
+
+
+# Pulsar’s integration with Flink 1.9: Using Pulsar as a Flink catalog
+
+The latest integration between [Flink 
1.9.0](https://flink.apache.org/downloads.html#apache-flink-191) and Pulsar 
addresses most of the previously mentioned shortcomings. The [contribution of 
Alibaba’s Blink to the Flink 
repository](https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html)
 adds many enhancements and new features to the processing framework that make 
the integration with Pulsar significantly more powerful and impactful. Flink 
1.9.0 brings Pulsar schema  [...]
+
+
+# Leveraging the Flink <> Pulsar Schema Integration
+
+Before delving into the integration details and how you can use Pulsar schema 
with Flink, let us describe how schema in Pulsar works. Schema in Apache Pulsar 
already co-exists and serves as the representation of the data on the broker 
side of the framework, something that makes schema registry with external 
systems obsolete. Additionally, the data schema in Pulsar is associated with 
each topic so both producers and consumers send data with predefined schema 
information, while the broker  [...]
+ 
+Below you can find an example of Pulsar’s schema on both the producer and 
consumer side. On the producer side, you can specify which schema you want to 
use and Pulsar then sends a POJO class without the need to perform any 
serialization/deserialization. Similarly, on the consumer end, you can also 
specify the data schema and upon receiving the data, Pulsar will automatically 
validate the schema information, fetch the schema of the given version and then 
deserialize the data back to a POJ [...]
+
+```java
+// Create producer with Struct schema and send messages
+Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
+producer.newMessage()
+  .value(User.builder()
+    .userName(“pulsar-user”)
+    .userId(1L)
+    .build())
+  .send();
+```
+
+```java
+// Create consumer with Struct schema and receive messages
+Consumer<User> consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();
+consumer.receive();
+```
+
+Let’s assume we have an application that specifies a schema to the producer 
and/or consumer. Upon receiving the schema information, the producer (or 
consumer) — that is connected to the broker — will transfer such information so 
that the broker can then perform schema registration, validations and schema 
compatibility checks before returning or rejecting the schema as illustrated in 
the diagram below: 
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/flink-pulsar-sql-blog-post-visual.png" 
width="600px" alt="Pulsar Schema"/>
+</center>
+<br>
+
+Not only is Pulsar able to handle and store the schema information, but is 
additionally able to handle any schema evolution — where necessary. Pulsar will 
effectively manage any schema evolution in the broker, keeping track of all 
different versions of your schema while performing any necessary compatibility 
checks. 
+ 
+Moreover, when messages are published on the producer side, Pulsar will tag 
each message with the schema version as part of each message’s metadata. On the 
consumer side, when the message is received and the metadata is deserialized, 
Pulsar will check the schema version associated with this message and will 
fetch the corresponding schema information from the broker. As a result, when 
Pulsar integrates with a Flink application it uses the pre-existing schema 
information and maps individua [...]
+ 
+For the cases when Flink users do not interact with schema directly or make 
use of primitive schema (for example, using a topic to store a string or long 
number), Pulsar will either convert the message payload into a Flink row, 
called ‘value’ or — for the cases of structured schema types, like JSON and 
AVRO —  Pulsar will extract the individual fields from the schema information 
and will map the fields to Flink’s type system. Finally, all metadata 
information associated with each message [...]
+
+
+<center>
+<img src="{{ site.baseurl 
}}/img/blog/flink-pulsar-sql-blog-post-visual-primitive-avro-schema.png" 
width="600px" alt="Primitive and AVRO Schema"/>
+</center>
+<br>
+
+Once all the schema information is mapped to Flink’s type system, you can 
start building a Pulsar source, sink or catalog in Flink based on the specified 
schema information as illustrated below:
+
+# Flink & Pulsar: Read data from Pulsar
+
+* Create a Pulsar source for streaming queries
+
+```java
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val props = new Properties()
+props.setProperty("service.url", "pulsar://...")
+props.setProperty("admin.url", "http://...";)
+props.setProperty("partitionDiscoveryIntervalMillis", "5000")
+props.setProperty("startingOffsets", "earliest")
+props.setProperty("topic", "test-source-topic")
+val source = new FlinkPulsarSource(props)
+// you don't need to provide a type information to addSource since 
FlinkPulsarSource is ResultTypeQueryable
+val dataStream = env.addSource(source)(null)
+
+// chain operations on dataStream of Row and sink the output
+// end method chaining
+
+env.execute()
+```
+
+* Register topics in Pulsar as streaming tables
+
+```java
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = StreamTableEnvironment.create(env)
+
+val prop = new Properties()
+prop.setProperty("service.url", serviceUrl)
+prop.setProperty("admin.url", adminUrl)
+prop.setProperty("flushOnCheckpoint", "true")
+prop.setProperty("failOnWrite", "true")
+props.setProperty("topic", "test-sink-topic")
+
+tEnv
+  .connect(new Pulsar().properties(props))
+  .inAppendMode()
+  .registerTableSource("sink-table")
+
+val sql = "INSERT INTO sink-table ....."
+tEnv.sqlUpdate(sql)
+env.execute()
+```
+
+# Flink & Pulsar: Write data to Pulsar
+
+* Create a Pulsar sink for streaming queries
+
+```java
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val stream = .....
+
+val prop = new Properties()
+prop.setProperty("service.url", serviceUrl)
+prop.setProperty("admin.url", adminUrl)
+prop.setProperty("flushOnCheckpoint", "true")
+prop.setProperty("failOnWrite", "true")
+props.setProperty("topic", "test-sink-topic")
+
+stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))
+env.execute()
+```
+
+* Write a streaming table to Pulsar
+
+```java
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = StreamTableEnvironment.create(env)
+
+val prop = new Properties()
+prop.setProperty("service.url", serviceUrl)
+prop.setProperty("admin.url", adminUrl)
+prop.setProperty("flushOnCheckpoint", "true")
+prop.setProperty("failOnWrite", "true")
+props.setProperty("topic", "test-sink-topic")
+
+tEnv
+  .connect(new Pulsar().properties(props))
+  .inAppendMode()
+  .registerTableSource("sink-table")
+
+val sql = "INSERT INTO sink-table ....."
+tEnv.sqlUpdate(sql)
+env.execute()
+```
+
+In every instance, Flink developers only need to specify the properties of how 
Flink will connect to a Pulsar cluster without worrying about any schema 
registry, or serialization/deserialization actions and register the Pulsar 
cluster as a source, sink or streaming table in Flink. Once all three elements 
are put together, Pulsar can then be registered as a catalog in Flink, 
something that drastically simplifies how you process and query data like, for 
example, writing a program to query  [...]
+
+
+# Next Steps & Future Integration
+
+The goal of the integration between Pulsar and Flink is to simplify how 
developers use the two frameworks to build a unified data processing stack. As 
we progress from the classical Lamda architectures — where an online, speeding 
layer is combined with an offline, batch layer to run data computations — Flink 
and Pulsar present a great combination in providing a truly unified data 
processing stack. We see Flink as a unified computation engine, handling both 
online (streaming) and offline  [...]
+ 
+There is still a lot of ongoing work and effort from both communities in 
getting the integration even better, such as a new source API 
([FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface))
 that will allow the [contribution of the Pulsar connectors to the Flink 
community](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discussion-Flink-Pulsar-Connector-td22019.html)
 as well as a new subscription type called `Key_Shared` subscrip [...]
+ 
+You can find a more detailed overview of the integration work between the two 
communities in this [recording video](https://youtu.be/3sBXXfgl5vs) from Flink 
Forward Europe 2019 or sign up to the [Flink dev mailing 
list](https://flink.apache.org/community.html#mailing-lists) for the latest 
contribution and integration efforts between Flink and Pulsar. 
diff --git 
a/img/blog/flink-pulsar-sql-blog-post-visual-primitive-avro-schema.png 
b/img/blog/flink-pulsar-sql-blog-post-visual-primitive-avro-schema.png
new file mode 100644
index 0000000..3e3aa5e
Binary files /dev/null and 
b/img/blog/flink-pulsar-sql-blog-post-visual-primitive-avro-schema.png differ
diff --git a/img/blog/flink-pulsar-sql-blog-post-visual.png 
b/img/blog/flink-pulsar-sql-blog-post-visual.png
new file mode 100644
index 0000000..ed9c21f
Binary files /dev/null and b/img/blog/flink-pulsar-sql-blog-post-visual.png 
differ

Reply via email to