MartijnVisser commented on a change in pull request #18660:
URL: https://github.com/apache/flink/pull/18660#discussion_r801660707



##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -44,9 +44,11 @@ Thus, you can use this format in two ways:
 - Bounded read for batch mode
 - Continuous read for streaming mode: monitors a directory for new files that 
appear 
 
-**Bounded read example**:
+## Flink RowData
 
-In this example we create a DataStream containing Parquet records as Flink 
Rows. We project the schema to read only certain fields ("f7", "f4" and "f99"). 
 
+#### Bounded read example

Review comment:
       ```suggestion
   ### Bounded data example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -28,7 +28,7 @@ under the License.
 
 # Parquet format
 
-Flink supports reading [Parquet](https://parquet.apache.org/) files and 
producing [Flink 
rows](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html).
+Flink supports reading [Parquet](https://parquet.apache.org/) files, producing 
[Flink 
RowDatas](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/data/RowData.html)
 and [Avro](https://avro.apache.org/) records.

Review comment:
       ```suggestion
   Flink supports reading [Parquet](https://parquet.apache.org/) files, 
producing {{< javadoc file="org/apache/flink/table/data/RowData.html" 
name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records.
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -44,9 +44,11 @@ Thus, you can use this format in two ways:
 - Bounded read for batch mode
 - Continuous read for streaming mode: monitors a directory for new files that 
appear 
 
-**Bounded read example**:
+## Flink RowData
 
-In this example we create a DataStream containing Parquet records as Flink 
Rows. We project the schema to read only certain fields ("f7", "f4" and "f99"). 
 
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Flink 
RowDatas. We project the schema to read only certain fields ("f7", "f4" and 
"f99").  
 We read records in batches of 500 records. The first boolean parameter 
specifies if timestamp columns need to be interpreted as UTC. 
 The second boolean instructs the application if the projected Parquet fields 
names are to be interpreted in a case sensitive way.
 There is no need for a watermark strategy as records do not contain event 
timestamps.

Review comment:
       I'm going to make some suggestions based on 
https://flink.apache.org/contributing/docs-style.html
   
   ```suggestion
   In this example, you will create a DataStream containing Parquet records as 
Flink RowDatas. The schema is projected to read only the specified fields 
("f7", "f4" and "f99").  
   Flink will read records in batches of 500 records. The first boolean 
parameter specifies that timestamp columns will be interpreted as UTC. 
   The second boolean instructs the application that the projected Parquet 
fields names are case sensitive.
   There is no a watermark strategy defined as records do not contain event 
timestamps.
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -71,9 +73,9 @@ final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-**Continuous read example**:
+#### Continuous read example
 
-In this example we create a DataStream containing Parquet records as Flink 
Rows that will 
+In this example, we create a DataStream containing Parquet records as Flink 
RowDatas that will 
 infinitely grow as new files are added to the directory. We monitor for new 
files each second.
 We project the schema to read only certain fields ("f7", "f4" and "f99").  
 We read records in batches of 500 records. The first boolean parameter 
specifies if timestamp columns need to be interpreted as UTC.

Review comment:
       ```suggestion
   In this example, you will create a DataStream containing Parquet records as 
Flink RowDatas that will 
   infinitely grow as new files are added to the directory. It will monitor for 
new files each second.
   The schema is projected to read only the specified fields ("f7", "f4" and 
"f99").  
   Flink will read records in batches of 500 records. The first boolean 
parameter specifies that timestamp columns will be interpreted as UTC. 
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -71,9 +73,9 @@ final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-**Continuous read example**:
+#### Continuous read example

Review comment:
       ```suggestion
   ### Unbounded stream example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):

Review comment:
       ```suggestion
   This example uses an Avro schema example similar to the one described in the 
[official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.

Review comment:
       ```suggestion
   In this example, you will create a DataStream containing Parquet records as 
Avro Generic records. 
   It will parse the Avro schema based on the JSON string. There are many other 
ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please 
refer to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
   After that, it creates an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example

Review comment:
       ```suggestion
   #### Bounded data example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example

Review comment:
       ```suggestion
   #### Bounded data example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:

Review comment:
       ```suggestion
   You will use the Avro Maven plugin to generate the `Address` Java class:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 

Review comment:
       ```suggestion
   You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for 
Avro Specific record 
   and then create a DataStream containing Parquet records as Avro Specific 
records. 
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:

Review comment:
       ```suggestion
   Flink supports producing three types of Avro records by reading Parquet 
files:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example

Review comment:
       ```suggestion
   #### Unbounded stream example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example

Review comment:
       ```suggestion
   #### Unbounded stream example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records

Review comment:
       Should this section actually be moved 
`/flink/flink-docs-master/docs/connectors/datastream/formats/avro/` ?

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):

Review comment:
       ```suggestion
   This example uses the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  

Review comment:
       ```suggestion
   This example is similar to the bounded data example. The application 
monitors for new files every second and reads Avro Generic records from Parquet 
files infinitely as new files are added to the directory.  
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated 
Address Java class 
+and monitor for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.

Review comment:
       ```suggestion
   This example, similar to the bounded data example, uses the same generated 
Address Java class 
   and monitors for new files every second to read Avro Specific records from 
Parquet files 
   infinitely as new files are added to the directory.
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.

Review comment:
       ```suggestion
   Based on the previously defined schema, you can generate classes by 
leveraging Avro code generation. 
   Once the classes have been generated, there is no need to use the schema 
directly in your program. 
   You can either use `avro-tools.jar` to generate code manually or you could 
use the Avro Maven plugin to perform 
   code generation on any .avsc files present in the configured source 
directory. Please refer to 
   [Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated 
Address Java class 
+and monitor for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
+Flink also supports creating a Datastream from Parquet files based on existing 
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols 
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more 
details.
+
+#### Bounded read example

Review comment:
       ```suggestion
   #### Bounded data example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated 
Address Java class 
+and monitor for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
+Flink also supports creating a Datastream from Parquet files based on existing 
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols 
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more 
details.

Review comment:
       ```suggestion
   Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
   Flink also supports creating a DataStream from Parquet files based on 
existing Java POJO classes.
   In this case, Avro will use Java reflection to generate schemas and 
protocols for these POJO classes.
   Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation 
for more details.
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated 
Address Java class 
+and monitor for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
+Flink also supports creating a Datastream from Parquet files based on existing 
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols 
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more 
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class 
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):

Review comment:
       ```suggestion
   This example uses a simple Java POJO class 
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated 
Address Java class 
+and monitor for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
+Flink also supports creating a Datastream from Parquet files based on existing 
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols 
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more 
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class 
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
+
+```java
+public class Datum implements Serializable {
+
+    public String a;
+    public int b;
+
+    public Datum() {}
+
+    public Datum(String a, int b) {
+        this.a = a;
+        this.b = b;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Datum datum = (Datum) o;
+        return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == 
null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = a != null ? a.hashCode() : 0;
+        result = 31 * result + b;
+        return result;
+    }
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Reflect record
+and then create a DataStream containing Parquet records as Avor Reflect 
records.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forReflectRecord(Datum.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example

Review comment:
       ```suggestion
   #### Unbounded stream example
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated 
Address Java class 
+and monitor for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
+Flink also supports creating a Datastream from Parquet files based on existing 
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols 
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more 
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class 
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
+
+```java
+public class Datum implements Serializable {
+
+    public String a;
+    public int b;
+
+    public Datum() {}
+
+    public Datum(String a, int b) {
+        this.a = a;
+        this.b = b;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Datum datum = (Datum) o;
+        return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == 
null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = a != null ? a.hashCode() : 0;
+        result = 31 * result + b;
+        return result;
+    }
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Reflect record
+and then create a DataStream containing Parquet records as Avor Reflect 
records.

Review comment:
       ```suggestion
   You create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Reflect record
   and then create a DataStream containing Parquet records as Avro Reflect 
records.
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in 
the [official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor 
Generic records. 
+We parse the Avro schema based on the JSON string. There are many other ways 
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer 
to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders` 
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new 
files every second 
+to read Avro Generic records from Parquet files infinitely as new files are 
added to the directory.  
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in our programs. 
+We can either use avro-tools.jar to generate code manually or use Avro Maven 
plugin to performs 
+code generation on any .avsc files present in the configured source 
directory.Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded read example
+
+In this example, we use the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avor Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated 
Address Java class 
+and monitor for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
+Flink also supports creating a Datastream from Parquet files based on existing 
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols 
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more 
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class 
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
+
+```java
+public class Datum implements Serializable {
+
+    public String a;
+    public int b;
+
+    public Datum() {}
+
+    public Datum(String a, int b) {
+        this.a = a;
+        this.b = b;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Datum datum = (Datum) o;
+        return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == 
null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = a != null ? a.hashCode() : 0;
+        result = 31 * result + b;
+        return result;
+    }
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Reflect record
+and then create a DataStream containing Parquet records as Avor Reflect 
records.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forReflectRecord(Datum.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same POJO Java 
class `Datum`
+and monitor for the new files every second to read Avro Reflect records from 
Parquet files
+infinitely as new files are added to the directory.

Review comment:
       ```suggestion
   This example, similar to the bounded data example, uses the same POJO Java 
class `Datum`
   and monitors for the new files every second to read Avro Reflect records 
from Parquet files
   infinitely as new files are added to the directory.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to