infoverload commented on a change in pull request #484:
URL: https://github.com/apache/flink-web/pull/484#discussion_r768590935



##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.

Review comment:
       ```suggestion
   That's also the main reason why Pravega has chosen to use Flink as the first 
integrated execution engine among the various distributed computing engines on 
the market. With the help of Flink, users can use flexible APIs for windowing, 
complex event processing (CEP), or table abstractions to process streaming data 
easily and enrich the data being stored. Since its inception in 2016, Pravega 
has established communication with Flink PMC members and developed the 
connector together. 
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"

Review comment:
       ```suggestion
   title: "Pravega Flink Connector 101"
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer

Review comment:
       ```suggestion
   Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for serialization/deserialization, while Flink has also defined standard 
interfaces for this purpose:
   - 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
   - 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
   
   For interoperability with other Pravega client applications, we have 
built-in adapters `PravegaSerializationSchema` and 
`PravegaDeserializationSchema` to support processing Pravega stream data 
produced by non-Flink applications.
   
   Here is the adapter for Pravega Java serializer:
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega stream.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream.
+
+1. **Best-effort** - Any write failures will be ignored hence there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.

Review comment:
       ```suggestion
   `FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega streams.
   
   It supports three writer modes that relate to guarantees about the 
persistence of events emitted by the sink to a Pravega Stream:
   
   1. **Best-effort** - Any write failures will be ignored and there could be 
data loss.
   2. **At-least-once**(default) - All events are persisted in Pravega. 
Duplicate events are possible due to retries or in case of failure and 
subsequent recovery.
   3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.

Review comment:
       ```suggestion
   [Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is 
a cloud-native storage system based on abstractions for both batch and 
streaming data consumption. Pravega streams (a new storage abstraction) are 
durable, consistent, and elastic, while natively supporting long-term data 
retention. In comparison, [Apache Flink](https://flink.apache.org/) is a 
popular real-time computing engine that provides unified batch and stream 
processing. Flink provides high-throughput, low-latency computation, as well as 
support for complex event processing and state management. Both Pravega and 
Flink share the same design philosophy and treat data streams as primitives. 
This makes them a great match when constructing storage+computing data 
pipelines which can unify batch and streaming use cases.
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).

Review comment:
       ```suggestion
   `1.10` is the Flink major version which is put in the middle of the artifact 
name. The Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
   
   `0.8.0` is the version that aligns with the Pravega version.
   
   You can find the latest release with a support matrix on the [GitHub 
Releases page](https://github.com/pravega/flink-connectors/releases).
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.

Review comment:
       ```suggestion
   In 2017, the Pravega Flink connector module started to move out of the 
Pravega main repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, many features have been implemented, including: 
   
   - exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
   - seamless integration with Flink's checkpoints and savepoints
   - parallel Readers and Writers supporting high throughput and low latency 
processing
   - support for Batch, Streaming, and Table API to access Pravega Streams
   
   These key features make streaming pipeline applications easier to develop 
without worrying about performance and correctness which are the common pain 
points for many streaming use cases. 
   
   In this blog post, we will discuss how to use this connector to read and 
write Pravega streams with the Flink DataStream API.  
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:

Review comment:
       ```suggestion
   To use this connector in your application, add the dependency to your 
project:
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`

Review comment:
       ```suggestion
   The connector provides a common top-level object `PravegaConfig` for Pravega 
connection configurations. The config object automatically configures itself 
from _environment variables_, _system properties_ and _program arguments_.
   
   The basic controller URI and the default scope can be set like this:
   
   |Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
   
|-------|-------------------------------------------------------------|-------------|
   |Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
   |Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
   
   The recommended way to create an instance of `PravegaConfig` is the 
following:
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega stream.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream.
+
+1. **Best-effort** - Any write failures will be ignored hence there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
+
+Internally it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct as well. 
The basic usage is as below.

Review comment:
       ```suggestion
   Internally, it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct. 
   
   A basic usage looks like this:
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.

Review comment:
       ```suggestion
   `FlinkPravegaReader` is a Flink `SourceFunction` implementation which 
supports parallel reads from one or more Pravega streams. Internally, it 
initiates a Pravega reader group and creates Pravega `EventStreamReader` 
instances to read the data from the stream(s). It provides a builder-style API 
to construct, and can allow streamcuts to mark the start and end of the read. 
   
   You can use it like this:
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega stream.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream.
+
+1. **Best-effort** - Any write failures will be ignored hence there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
+
+Internally it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct as well. 
The basic usage is as below.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Define the Pravega configuration
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event serializer
+SerializationSchema<MyClass> serializer = ...
+
+// Define the event router for selecting the Routing Key
+PravegaEventRouter<MyClass> router = ...
+
+// Define the sink function
+FlinkPravegaWriter<MyClass> pravegaSink = FlinkPravegaWriter.<MyClass>builder()
+   .forStream(...)
+   .withPravegaConfig(config)
+   .withSerializationSchema(serializer)
+   .withEventRouter(router)
+   .withWriterMode(EXACTLY_ONCE)
+   .build();
+
+DataStream<MyClass> stream = ...
+stream.addSink(pravegaSink)
+    .setParallelism(4)
+    .uid("pravega-sink");
+```
+
+You can also see some more samples 
[here](https://github.com/pravega/pravega-samples) which can help to learn more 
about the usage.

Review comment:
       ```suggestion
   You can see some more examples 
[here](https://github.com/pravega/pravega-samples).
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega stream.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream.
+
+1. **Best-effort** - Any write failures will be ignored hence there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
+
+Internally it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct as well. 
The basic usage is as below.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Define the Pravega configuration
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event serializer
+SerializationSchema<MyClass> serializer = ...
+
+// Define the event router for selecting the Routing Key
+PravegaEventRouter<MyClass> router = ...
+
+// Define the sink function
+FlinkPravegaWriter<MyClass> pravegaSink = FlinkPravegaWriter.<MyClass>builder()
+   .forStream(...)
+   .withPravegaConfig(config)
+   .withSerializationSchema(serializer)
+   .withEventRouter(router)
+   .withWriterMode(EXACTLY_ONCE)
+   .build();
+
+DataStream<MyClass> stream = ...
+stream.addSink(pravegaSink)
+    .setParallelism(4)
+    .uid("pravega-sink");
+```
+
+You can also see some more samples 
[here](https://github.com/pravega/pravega-samples) which can help to learn more 
about the usage.
+
+# Internals of reader and writer
+
+## Checkpoint integration
+
+Flink has periodic checkpoints following the Chandy-Lamport algorithm to make 
state in Flink fault tolerant by allowing state and the corresponding stream 
positions to be recovered, thereby giving the application the same semantics as 
a failure-free execution.
+
+Pravega also has its own Checkpoint concept which is to create a consistent 
"point in time" persistence of the state of each Reader in the Reader Group, by 
using a specialized Event (*Checkpoint Event*) to signal each Reader to 
preserve its state. Once a Checkpoint has been completed, the application can 
use the Checkpoint to reset all the Readers in the Reader Group to the known 
consistent state represented by the Checkpoint.
+
+This means that our end-to-end recovery story is not like the other messaging 
systems like Kafka to persist its offset in the Flink task state and let Flink 
do the coordination which is a bit too coupled. Flink delegates the Pravega 
source recovery totally to Pravega server and only has a lightweight hook to 
connect. It was also an innovation at that time. We teamworked with the Flink 
community, and they helped to add a new interface `ExternallyInducedSource` 
(relevant Jira [here](https://issues.apache.org/jira/browse/FLINK-6390)) to 
allow such external calls for checkpointing, and finally the connector 
integrated this interface well to guarantee the exactly once semantics during a 
failure recovery. 

Review comment:
       ```suggestion
   Flink has periodic checkpoints based on the Chandy-Lamport algorithm to make 
state in Flink fault-tolerant. By allowing state and the corresponding stream 
positions to be recovered, the application is given the same semantics as a 
failure-free execution.
   
   Pravega also has its own Checkpoint concept which is to create a consistent 
"point in time" persistence of the state of each Reader in the Reader Group, by 
using a specialized Event (*Checkpoint Event*) to signal each Reader to 
preserve its state. Once a Checkpoint has been completed, the application can 
use the Checkpoint to reset all the Readers in the Reader Group to the known 
consistent state represented by the Checkpoint.
   
   This means that our end-to-end recovery story is not like other messaging 
systems such as Kafka, which uses a more coupled method and persists its offset 
in the Flink task state and lets Flink do the coordination. Flink delegates the 
Pravega source recovery completely to the Pravega server and uses only a 
lightweight hook to connect. We collaborated with the Flink community and added 
a new interface `ExternallyInducedSource` 
([FLINK-6390](https://issues.apache.org/jira/browse/FLINK-6390)) to allow such 
external calls for checkpointing. The connector integrated this interface to 
guarantee exactly-once semantics during a failure recovery. 
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega stream.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream.
+
+1. **Best-effort** - Any write failures will be ignored hence there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
+
+Internally it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct as well. 
The basic usage is as below.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Define the Pravega configuration
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event serializer
+SerializationSchema<MyClass> serializer = ...
+
+// Define the event router for selecting the Routing Key
+PravegaEventRouter<MyClass> router = ...
+
+// Define the sink function
+FlinkPravegaWriter<MyClass> pravegaSink = FlinkPravegaWriter.<MyClass>builder()
+   .forStream(...)
+   .withPravegaConfig(config)
+   .withSerializationSchema(serializer)
+   .withEventRouter(router)
+   .withWriterMode(EXACTLY_ONCE)
+   .build();
+
+DataStream<MyClass> stream = ...
+stream.addSink(pravegaSink)
+    .setParallelism(4)
+    .uid("pravega-sink");
+```
+
+You can also see some more samples 
[here](https://github.com/pravega/pravega-samples) which can help to learn more 
about the usage.
+
+# Internals of reader and writer
+
+## Checkpoint integration
+
+Flink has periodic checkpoints following the Chandy-Lamport algorithm to make 
state in Flink fault tolerant by allowing state and the corresponding stream 
positions to be recovered, thereby giving the application the same semantics as 
a failure-free execution.
+
+Pravega also has its own Checkpoint concept which is to create a consistent 
"point in time" persistence of the state of each Reader in the Reader Group, by 
using a specialized Event (*Checkpoint Event*) to signal each Reader to 
preserve its state. Once a Checkpoint has been completed, the application can 
use the Checkpoint to reset all the Readers in the Reader Group to the known 
consistent state represented by the Checkpoint.
+
+This means that our end-to-end recovery story is not like the other messaging 
systems like Kafka to persist its offset in the Flink task state and let Flink 
do the coordination which is a bit too coupled. Flink delegates the Pravega 
source recovery totally to Pravega server and only has a lightweight hook to 
connect. It was also an innovation at that time. We teamworked with the Flink 
community, and they helped to add a new interface `ExternallyInducedSource` 
(relevant Jira [here](https://issues.apache.org/jira/browse/FLINK-6390)) to 
allow such external calls for checkpointing, and finally the connector 
integrated this interface well to guarantee the exactly once semantics during a 
failure recovery. 
+
+The checkpoint mechanism works as a two-step process:
+
+   - The [master 
hook](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html)
 handler from the job manager initiates the 
[`triggerCheckpoint`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html#triggerCheckpoint-long-long-java.util.concurrent.Executor-)
 request to  the `ReaderCheckpointHook` that was registered with the Job 
Manager during `FlinkPravegaReader` source initialization. The 
`ReaderCheckpointHook` handler notifies Pravega to checkpoint the current 
reader state. This is a non-blocking call which returns a `future` once Pravega 
readers are done with the checkpointing. Once the `future` completes, the 
Pravega checkpoint will be persisted in a "master state" of a Flink checkpoint.
+   - A `Checkpoint` event will be sent by Pravega as part of the data stream 
flow and on receiving the event, the `FlinkPravegaReader` will initiate 
[`triggerCheckpoint`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java#L73)
 request to effectively let Flink continue and complete the checkpoint process.

Review comment:
       ```suggestion
   The checkpoint mechanism works as a two-step process:
   
      - The [master 
hook](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html)
 handler from the JobManager initiates the 
[`triggerCheckpoint`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html#triggerCheckpoint-long-long-java.util.concurrent.Executor-)
 request to the `ReaderCheckpointHook` that was registered with the JobManager 
during `FlinkPravegaReader` source initialization. The `ReaderCheckpointHook` 
handler notifies Pravega to checkpoint the current reader state. This is a 
non-blocking call that returns a `future` once Pravega readers are done with 
the checkpointing. Once the `future` completes, the Pravega checkpoint will be 
persisted in a "master state" of a Flink checkpoint.
      - A `Checkpoint` event will be sent by Pravega as part of the data stream 
flow and, upon receiving the event, the `FlinkPravegaReader` will initiate a 
[`triggerCheckpoint`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java#L73)
 request to effectively let Flink continue and complete the checkpoint process.
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 

Review comment:
       ```suggestion
   In the above example, 
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega stream.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream.
+
+1. **Best-effort** - Any write failures will be ignored hence there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
+
+Internally it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct as well. 
The basic usage is as below.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Define the Pravega configuration
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event serializer
+SerializationSchema<MyClass> serializer = ...
+
+// Define the event router for selecting the Routing Key
+PravegaEventRouter<MyClass> router = ...
+
+// Define the sink function
+FlinkPravegaWriter<MyClass> pravegaSink = FlinkPravegaWriter.<MyClass>builder()
+   .forStream(...)
+   .withPravegaConfig(config)
+   .withSerializationSchema(serializer)
+   .withEventRouter(router)
+   .withWriterMode(EXACTLY_ONCE)
+   .build();
+
+DataStream<MyClass> stream = ...
+stream.addSink(pravegaSink)
+    .setParallelism(4)
+    .uid("pravega-sink");
+```
+
+You can also see some more samples 
[here](https://github.com/pravega/pravega-samples) which can help to learn more 
about the usage.
+
+# Internals of reader and writer
+
+## Checkpoint integration
+
+Flink has periodic checkpoints following the Chandy-Lamport algorithm to make 
state in Flink fault tolerant by allowing state and the corresponding stream 
positions to be recovered, thereby giving the application the same semantics as 
a failure-free execution.
+
+Pravega also has its own Checkpoint concept which is to create a consistent 
"point in time" persistence of the state of each Reader in the Reader Group, by 
using a specialized Event (*Checkpoint Event*) to signal each Reader to 
preserve its state. Once a Checkpoint has been completed, the application can 
use the Checkpoint to reset all the Readers in the Reader Group to the known 
consistent state represented by the Checkpoint.
+
+This means that our end-to-end recovery story is not like the other messaging 
systems like Kafka to persist its offset in the Flink task state and let Flink 
do the coordination which is a bit too coupled. Flink delegates the Pravega 
source recovery totally to Pravega server and only has a lightweight hook to 
connect. It was also an innovation at that time. We teamworked with the Flink 
community, and they helped to add a new interface `ExternallyInducedSource` 
(relevant Jira [here](https://issues.apache.org/jira/browse/FLINK-6390)) to 
allow such external calls for checkpointing, and finally the connector 
integrated this interface well to guarantee the exactly once semantics during a 
failure recovery. 
+
+The checkpoint mechanism works as a two-step process:
+
+   - The [master 
hook](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html)
 handler from the job manager initiates the 
[`triggerCheckpoint`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html#triggerCheckpoint-long-long-java.util.concurrent.Executor-)
 request to  the `ReaderCheckpointHook` that was registered with the Job 
Manager during `FlinkPravegaReader` source initialization. The 
`ReaderCheckpointHook` handler notifies Pravega to checkpoint the current 
reader state. This is a non-blocking call which returns a `future` once Pravega 
readers are done with the checkpointing. Once the `future` completes, the 
Pravega checkpoint will be persisted in a "master state" of a Flink checkpoint.
+   - A `Checkpoint` event will be sent by Pravega as part of the data stream 
flow and on receiving the event, the `FlinkPravegaReader` will initiate 
[`triggerCheckpoint`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java#L73)
 request to effectively let Flink continue and complete the checkpoint process.
+
+## End-to-end exactly-once semantics
+
+In the early years of big data processing, results from real time stream 
processing was always considered inaccurate/approximate/speculative. However, 
this correctness is extremely important in some industries and use cases such 
as finance. This constraint is mainly coming from the following two problems.
+
+1. The unordered data source in event time 
+2. End-to-end exactly once semantics guarantee
+
+During recent years of development, watermark has been introduced as a 
tradeoff between correctness and latency, which is now considered as a good 
solution for Problem 1. 
+
+The second problem is a more difficult one. When we say “exactly-once 
semantics”, what we mean is that each incoming event affects the final results 
exactly once. Even in case of a machine or software failure, there’s no 
duplicate data and no data that goes unprocessed. This is quite hard because of 
the demanding message acknowledgement and recovery during the fast processing, 
that's also why some early distributed streaming engines like Storm(without 
Trident) choose to support "at-least once". Flink is one of the first streaming 
systems that provided exactly-once semantics internally in an application due 
to its delicate [checkpoint 
mechanism](https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink).
 But to make it end-to-end, it still needs to fix the final stage to apply the 
semantic to the sink of external message systems that supports commits and 
rollback. 
+
+Here comes Pravega that supports [transactional 
writes](https://cncf.pravega.io/docs/latest/transactions/). The idea with a 
Pravega transaction is that it allows an application to prepare a set of events 
that can be written "all at once" to a Stream. This allows an application to 
"commit" a bunch of events atomically. With this Pravega feature making the 
writes idempotent, it is possible to implement end-to-end exactly-once 
pipelines together with Flink. 
+
+To build such an end-to-end solution is still not easy, the main difficulty is 
to have the coordination between Flink and Pravega sink. One common approach 
for coordinating commits and rollbacks in a distributed system is the two-phase 
commit protocol. We followed this path, worked together with the Flink 
community and implemented the sink function in a two-phase commit way 
coordinated with Flink checkpoints.
+
+Flink community then extracted the common logic of the two-phase commit 
protocol and provided a general interface `TwoPhaseCommitSinkFunction` 
(relevant Jira [here](https://issues.apache.org/jira/browse/FLINK-7210)) to 
make it possible to build end-to-end exactly-once applications with other 
message system with transaction support including Apache Kafka versions 0.11 
and beyond. There is an official Flink 
[blog](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html)
 as well to introduce this feature in detail.

Review comment:
       ```suggestion
   In the early years of big data processing, results from real-time stream 
processing were always considered inaccurate/approximate/speculative. However, 
this correctness is extremely important for some use cases and in some 
industries such as finance. 
   
   This constraint stems mainly from two issues:
   - unordered data source in event time 
   - end-to-end exactly-once semantics guarantee
   
   During recent years of development, watermarking has been introduced as a 
tradeoff between correctness and latency, which is now considered a good 
solution for unordered data sources in event time. 
   
   The guarantee of end-to-end exactly-once semantics is more tricky. When we 
say “exactly-once semantics”, what we mean is that each incoming event affects 
the final results exactly once. Even in the event of a machine or software 
failure, there is no duplicate data and no data that goes unprocessed. This is 
quite difficult because of the demands of message acknowledgment and recovery 
during such fast processing and is also why some early distributed streaming 
engines like Storm(without Trident) chose to support "at-least-once" 
guarantees. 
   
   Flink is one of the first streaming systems that was able to provide 
exactly-once semantics due to its delicate [checkpoint 
mechanism](https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink).
 But to make it work end-to-end, the final stage needs to apply the semantic to 
external message system sinks that support commits and rollbacks. 
   
   To work around this problem, Pravega introduced [transactional 
writes](https://cncf.pravega.io/docs/latest/transactions/). A Pravega 
transaction allows an application to prepare a set of events that can be 
written "all at once" to a Stream. This allows an application to "commit" a 
bunch of events atomically. When writes are idempotent, it is possible to 
implement end-to-end exactly-once pipelines together with Flink. 
   
   To build such an end-to-end solution requires coordination between Flink and 
the Pravega sink, which is still challenging. A common approach for 
coordinating commits and rollbacks in a distributed system is the two-phase 
commit protocol. We used this protocol and, together with the Flink community, 
implemented the sink function in a two-phase commit way coordinated with Flink 
checkpoints.
   
   The Flink community then extracted the common logic from the two-phase 
commit protocol and provided a general interface `TwoPhaseCommitSinkFunction` 
([FLINK-7210](https://issues.apache.org/jira/browse/FLINK-7210)) to make it 
possible to build end-to-end exactly-once applications with other message 
systems that have transaction support. This includes Apache Kafka versions 0.11 
and above. There is an official Flink [blog 
post](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html)
 that describes this feature in detail.
   ```

##########
File path: _posts/2021-11-30-pravega-connector-101.md
##########
@@ -0,0 +1,213 @@
+---
+layout: post
+title: "Pravega Flink connector 101"
+date: 2021-11-30 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on stream abstractions for both batch and 
streaming data consumption. Pravega streams are durable, consistent, and 
elastic, while natively supporting long-term data retention. Relatively, 
[Apache Flink](https://flink.apache.org/) is a widely-used real-time computing 
engine and provides unified batch and stream processing developed by the Apache 
Software Foundation. Flink provides high-throughput, low-latency streaming data 
processing, as well as support for complex event processing and state 
management. Both Pravega and Flink share the same design philosophy that treats 
"data stream" as the first citizen, which makes them the best companion of each 
other to construct storage+computing data pipelines which can unify batch and 
streaming use cases.
+
+That's also the key reason why Pravega has chosen Flink as the first 
integrated execution engine among so many various distributed computing 
engines. Besides, with the help of Flink, users can use flexible Flink APIs 
like windowing, complex event processing(CEP) or table abstractions to process 
the streaming data easily that enlightens the data under the storage. Thus, 
Pravega has established communication with some Flink PMC members and developed 
the connector together since it started in 2016. Moreover, Pravega team has 
been an active participant of Flink Forward every year since then and gives 
keynote speeches introducing new features of Pravega.
+
+In 2017, Flink connector module started to move out of the Pravega main 
repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, Pravega Flink connector has implemented many key features 
including
+
+- Exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- Seamless integration with Flink's checkpoints and savepoints.
+- Parallel Readers and Writers supporting high throughput and low latency 
processing.
+- Batch/ Streaming/ Table API support to access Pravega Streams.
+
+These key features make streaming pipeline applications easier to develop 
without worrying performance and correctness which are the common pain points 
for many streaming use cases. 
+
+In this blog, we will discuss how to use the connector to read and write 
Pravega streams in the Flink basic DataStream API in the Flink application 
development. We will introduce Table API usages further in another blog, stay 
tuned if you are interested.
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in the application, please pick the proper dependency 
and add it to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.10_2.12</artifactId>
+  <version>0.8.0</version>
+</dependency>
+```
+
+
+As in the above example, 
+
+`1.10` is the Flink major version which is put in the middle of the artifact 
name. Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.8.0` is the version that aligns with the Pravega version
+
+You can find the latest release with support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector has provided a common top-level object `PravegaConfig` for 
Pravega connection configurations. The config object automatically configures 
itself from _environment variables_, _system properties_ and _program 
arguments_.
+
+The basic controller URI and the default scope can be set as below.
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+This is the recommended way to create an instance of `PravegaConfig`
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+e.g. Adapter for Pravega Java serializer
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega stream. Internally it will initiate a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read. The basic usage is 
as below.
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega stream.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream.
+
+1. **Best-effort** - Any write failures will be ignored hence there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
+
+Internally it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct as well. 
The basic usage is as below.
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Define the Pravega configuration
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event serializer
+SerializationSchema<MyClass> serializer = ...
+
+// Define the event router for selecting the Routing Key
+PravegaEventRouter<MyClass> router = ...
+
+// Define the sink function
+FlinkPravegaWriter<MyClass> pravegaSink = FlinkPravegaWriter.<MyClass>builder()
+   .forStream(...)
+   .withPravegaConfig(config)
+   .withSerializationSchema(serializer)
+   .withEventRouter(router)
+   .withWriterMode(EXACTLY_ONCE)
+   .build();
+
+DataStream<MyClass> stream = ...
+stream.addSink(pravegaSink)
+    .setParallelism(4)
+    .uid("pravega-sink");
+```
+
+You can also see some more samples 
[here](https://github.com/pravega/pravega-samples) which can help to learn more 
about the usage.
+
+# Internals of reader and writer
+
+## Checkpoint integration
+
+Flink has periodic checkpoints following the Chandy-Lamport algorithm to make 
state in Flink fault tolerant by allowing state and the corresponding stream 
positions to be recovered, thereby giving the application the same semantics as 
a failure-free execution.
+
+Pravega also has its own Checkpoint concept which is to create a consistent 
"point in time" persistence of the state of each Reader in the Reader Group, by 
using a specialized Event (*Checkpoint Event*) to signal each Reader to 
preserve its state. Once a Checkpoint has been completed, the application can 
use the Checkpoint to reset all the Readers in the Reader Group to the known 
consistent state represented by the Checkpoint.
+
+This means that our end-to-end recovery story is not like the other messaging 
systems like Kafka to persist its offset in the Flink task state and let Flink 
do the coordination which is a bit too coupled. Flink delegates the Pravega 
source recovery totally to Pravega server and only has a lightweight hook to 
connect. It was also an innovation at that time. We teamworked with the Flink 
community, and they helped to add a new interface `ExternallyInducedSource` 
(relevant Jira [here](https://issues.apache.org/jira/browse/FLINK-6390)) to 
allow such external calls for checkpointing, and finally the connector 
integrated this interface well to guarantee the exactly once semantics during a 
failure recovery. 
+
+The checkpoint mechanism works as a two-step process:
+
+   - The [master 
hook](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html)
 handler from the job manager initiates the 
[`triggerCheckpoint`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html#triggerCheckpoint-long-long-java.util.concurrent.Executor-)
 request to  the `ReaderCheckpointHook` that was registered with the Job 
Manager during `FlinkPravegaReader` source initialization. The 
`ReaderCheckpointHook` handler notifies Pravega to checkpoint the current 
reader state. This is a non-blocking call which returns a `future` once Pravega 
readers are done with the checkpointing. Once the `future` completes, the 
Pravega checkpoint will be persisted in a "master state" of a Flink checkpoint.
+   - A `Checkpoint` event will be sent by Pravega as part of the data stream 
flow and on receiving the event, the `FlinkPravegaReader` will initiate 
[`triggerCheckpoint`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java#L73)
 request to effectively let Flink continue and complete the checkpoint process.
+
+## End-to-end exactly-once semantics
+
+In the early years of big data processing, results from real time stream 
processing was always considered inaccurate/approximate/speculative. However, 
this correctness is extremely important in some industries and use cases such 
as finance. This constraint is mainly coming from the following two problems.
+
+1. The unordered data source in event time 
+2. End-to-end exactly once semantics guarantee
+
+During recent years of development, watermark has been introduced as a 
tradeoff between correctness and latency, which is now considered as a good 
solution for Problem 1. 
+
+The second problem is a more difficult one. When we say “exactly-once 
semantics”, what we mean is that each incoming event affects the final results 
exactly once. Even in case of a machine or software failure, there’s no 
duplicate data and no data that goes unprocessed. This is quite hard because of 
the demanding message acknowledgement and recovery during the fast processing, 
that's also why some early distributed streaming engines like Storm(without 
Trident) choose to support "at-least once". Flink is one of the first streaming 
systems that provided exactly-once semantics internally in an application due 
to its delicate [checkpoint 
mechanism](https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink).
 But to make it end-to-end, it still needs to fix the final stage to apply the 
semantic to the sink of external message systems that supports commits and 
rollback. 
+
+Here comes Pravega that supports [transactional 
writes](https://cncf.pravega.io/docs/latest/transactions/). The idea with a 
Pravega transaction is that it allows an application to prepare a set of events 
that can be written "all at once" to a Stream. This allows an application to 
"commit" a bunch of events atomically. With this Pravega feature making the 
writes idempotent, it is possible to implement end-to-end exactly-once 
pipelines together with Flink. 
+
+To build such an end-to-end solution is still not easy, the main difficulty is 
to have the coordination between Flink and Pravega sink. One common approach 
for coordinating commits and rollbacks in a distributed system is the two-phase 
commit protocol. We followed this path, worked together with the Flink 
community and implemented the sink function in a two-phase commit way 
coordinated with Flink checkpoints.
+
+Flink community then extracted the common logic of the two-phase commit 
protocol and provided a general interface `TwoPhaseCommitSinkFunction` 
(relevant Jira [here](https://issues.apache.org/jira/browse/FLINK-7210)) to 
make it possible to build end-to-end exactly-once applications with other 
message system with transaction support including Apache Kafka versions 0.11 
and beyond. There is an official Flink 
[blog](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html)
 as well to introduce this feature in detail.
+
+# Wrap Up and plan for the future
+
+If you’ve made it this far, thanks for staying with us through a detailed 
post. Pravega Flink connector is a project that makes Pravega connects to Flink 
and breathe life into Pravega to act as a key data store in a streaming 
pipeline. Here are some key points that we covered
+
+- The background and history of Flink and the connector integration
+- Introduction to basic usage of the streaming source and sink in Pravega 
Flink connector
+- Deep dive into the internals of the checkpoint and end-to-end exactly-once 
feature
+
+Finally I'd like to mention some of our plans for the future work on Flink 
connector.
+
+- `FlinkPravegaInputFormat` and `FlinkPravegaOutputFormat` is now provided to 
support batch reads and writes in Flink, but these are under legacy DataSet 
API. As Flink is now making much effort to unify batch and streaming, it has 
dedicated on improving from API level recently and provided new interfaces for 
[source](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 and 
[sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API)
 API in the recent Flink 1.11 and 1.12 releases. We will still actively 
communicate with Flink community and integrate with the new APIs. 
+- We will also put more effort on SQL / Table API support to provide a better 
using experience as it's simpler to understand and even more powerful to use in 
some cases.

Review comment:
       ```suggestion
   # Summary
   
   The Pravega Flink connector enables Pravega to connect to Flink and allows 
Pravega to act as a key data store in a streaming pipeline. Both projects share 
a common design philosophy and can integrate well with each other. Pravega has 
its own concept of checkpointing and has implemented transactional writes to 
support end-to-end exactly-once guarantees. 
   
   # Future plans
   
   `FlinkPravegaInputFormat` and `FlinkPravegaOutputFormat` are now provided to 
support batch reads and writes in Flink, but these are under the legacy DataSet 
API. Since Flink is now making efforts to unify batch and streaming, it is 
improving its APIs and providing new interfaces for the 
[source](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 and 
[sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API)
 APIs in the Flink 1.11 and 1.12 releases. We will continue to work with the 
Flink community and integrate with the new APIs.
    
   We will also put more effort into SQL / Table API support in order to 
provide a better user experience since it is simpler to understand and even 
more powerful to use in some cases.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to