MartijnVisser commented on a change in pull request #18660:
URL: https://github.com/apache/flink/pull/18660#discussion_r801660707
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -44,9 +44,11 @@ Thus, you can use this format in two ways:
- Bounded read for batch mode
- Continuous read for streaming mode: monitors a directory for new files that
appear
-**Bounded read example**:
+## Flink RowData
-In this example we create a DataStream containing Parquet records as Flink
Rows. We project the schema to read only certain fields ("f7", "f4" and "f99").
+#### Bounded read example
Review comment:
```suggestion
### Bounded data example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -28,7 +28,7 @@ under the License.
# Parquet format
-Flink supports reading [Parquet](https://parquet.apache.org/) files and
producing [Flink
rows](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html).
+Flink supports reading [Parquet](https://parquet.apache.org/) files, producing
[Flink
RowDatas](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/data/RowData.html)
and [Avro](https://avro.apache.org/) records.
Review comment:
```suggestion
Flink supports reading [Parquet](https://parquet.apache.org/) files,
producing {{< javadoc file="org/apache/flink/table/data/RowData.html"
name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -44,9 +44,11 @@ Thus, you can use this format in two ways:
- Bounded read for batch mode
- Continuous read for streaming mode: monitors a directory for new files that
appear
-**Bounded read example**:
+## Flink RowData
-In this example we create a DataStream containing Parquet records as Flink
Rows. We project the schema to read only certain fields ("f7", "f4" and "f99").
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Flink
RowDatas. We project the schema to read only certain fields ("f7", "f4" and
"f99").
We read records in batches of 500 records. The first boolean parameter
specifies if timestamp columns need to be interpreted as UTC.
The second boolean instructs the application if the projected Parquet fields
names are to be interpreted in a case sensitive way.
There is no need for a watermark strategy as records do not contain event
timestamps.
Review comment:
I'm going to make some suggestions based on
https://flink.apache.org/contributing/docs-style.html
```suggestion
In this example, you will create a DataStream containing Parquet records as
Flink RowDatas. The schema is projected to read only the specified fields
("f7", "f4" and "f99").
Flink will read records in batches of 500 records. The first boolean
parameter specifies that timestamp columns will be interpreted as UTC.
The second boolean instructs the application that the projected Parquet
fields names are case sensitive.
There is no a watermark strategy defined as records do not contain event
timestamps.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -71,9 +73,9 @@ final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
-**Continuous read example**:
+#### Continuous read example
-In this example we create a DataStream containing Parquet records as Flink
Rows that will
+In this example, we create a DataStream containing Parquet records as Flink
RowDatas that will
infinitely grow as new files are added to the directory. We monitor for new
files each second.
We project the schema to read only certain fields ("f7", "f4" and "f99").
We read records in batches of 500 records. The first boolean parameter
specifies if timestamp columns need to be interpreted as UTC.
Review comment:
```suggestion
In this example, you will create a DataStream containing Parquet records as
Flink RowDatas that will
infinitely grow as new files are added to the directory. It will monitor for
new files each second.
The schema is projected to read only the specified fields ("f7", "f4" and
"f99").
Flink will read records in batches of 500 records. The first boolean
parameter specifies that timestamp columns will be interpreted as UTC.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -71,9 +73,9 @@ final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
-**Continuous read example**:
+#### Continuous read example
Review comment:
```suggestion
### Unbounded stream example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
Review comment:
```suggestion
This example uses an Avro schema example similar to the one described in the
[official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
Review comment:
```suggestion
In this example, you will create a DataStream containing Parquet records as
Avro Generic records.
It will parse the Avro schema based on the JSON string. There are many other
ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please
refer to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
After that, it creates an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
Review comment:
```suggestion
#### Bounded data example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
Review comment:
```suggestion
#### Bounded data example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
Review comment:
```suggestion
You will use the Avro Maven plugin to generate the `Address` Java class:
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
Review comment:
```suggestion
You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for
Avro Specific record
and then create a DataStream containing Parquet records as Avro Specific
records.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
Review comment:
```suggestion
Flink supports producing three types of Avro records by reading Parquet
files:
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
Review comment:
```suggestion
#### Unbounded stream example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
Review comment:
```suggestion
#### Unbounded stream example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
Review comment:
Should this section actually be moved
`/flink/flink-docs-master/docs/connectors/datastream/formats/avro/` ?
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
Review comment:
```suggestion
This example uses the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
Review comment:
```suggestion
This example is similar to the bounded data example. The application
monitors for new files every second and reads Avro Generic records from Parquet
files infinitely as new files are added to the directory.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated
Address Java class
+and monitor for the new files every second to read Avro Specific records from
Parquet files
+infinitely as new files are added to the directory.
Review comment:
```suggestion
This example, similar to the bounded data example, uses the same generated
Address Java class
and monitors for new files every second to read Avro Specific records from
Parquet files
infinitely as new files are added to the directory.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
Review comment:
```suggestion
Based on the previously defined schema, you can generate classes by
leveraging Avro code generation.
Once the classes have been generated, there is no need to use the schema
directly in your program.
You can either use `avro-tools.jar` to generate code manually or you could
use the Avro Maven plugin to perform
code generation on any .avsc files present in the configured source
directory. Please refer to
[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated
Address Java class
+and monitor for the new files every second to read Avro Specific records from
Parquet files
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro
schema,
+Flink also supports creating a Datastream from Parquet files based on existing
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more
details.
+
+#### Bounded read example
Review comment:
```suggestion
#### Bounded data example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated
Address Java class
+and monitor for the new files every second to read Avro Specific records from
Parquet files
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro
schema,
+Flink also supports creating a Datastream from Parquet files based on existing
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more
details.
Review comment:
```suggestion
Beyond Avro Generic and Specific record that requires a predefined Avro
schema,
Flink also supports creating a DataStream from Parquet files based on
existing Java POJO classes.
In this case, Avro will use Java reflection to generate schemas and
protocols for these POJO classes.
Java types are mapped to Avro schemas, please refer to the [Avro
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation
for more details.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated
Address Java class
+and monitor for the new files every second to read Avro Specific records from
Parquet files
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro
schema,
+Flink also supports creating a Datastream from Parquet files based on existing
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
Review comment:
```suggestion
This example uses a simple Java POJO class
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated
Address Java class
+and monitor for the new files every second to read Avro Specific records from
Parquet files
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro
schema,
+Flink also supports creating a Datastream from Parquet files based on existing
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
+
+```java
+public class Datum implements Serializable {
+
+ public String a;
+ public int b;
+
+ public Datum() {}
+
+ public Datum(String a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Datum datum = (Datum) o;
+ return b == datum.b && (a != null ? a.equals(datum.a) : datum.a ==
null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = a != null ? a.hashCode() : 0;
+ result = 31 * result + b;
+ return result;
+ }
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Reflect record
+and then create a DataStream containing Parquet records as Avor Reflect
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forReflectRecord(Datum.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
Review comment:
```suggestion
#### Unbounded stream example
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated
Address Java class
+and monitor for the new files every second to read Avro Specific records from
Parquet files
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro
schema,
+Flink also supports creating a Datastream from Parquet files based on existing
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
+
+```java
+public class Datum implements Serializable {
+
+ public String a;
+ public int b;
+
+ public Datum() {}
+
+ public Datum(String a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Datum datum = (Datum) o;
+ return b == datum.b && (a != null ? a.equals(datum.a) : datum.a ==
null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = a != null ? a.hashCode() : 0;
+ result = 31 * result + b;
+ return result;
+ }
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Reflect record
+and then create a DataStream containing Parquet records as Avor Reflect
records.
Review comment:
```suggestion
You create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Reflect record
and then create a DataStream containing Parquet records as Avro Reflect
records.
```
##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +102,249 @@ final FileSource<RowData> source =
final DataStream<RowData> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro
schemas and types from the [Avro
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+In this example we use an Avro schema example similar to the one described in
the [official Avro
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favoriteNumber", "type": ["int", "null"]},
+ {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name,
favoriteNumber, and favoriteColor. You can find more details at [record
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for
how to define an Avro schema.
+
+#### Bounded read example
+
+In this example, we create a DataStream containing Parquet records as Avor
Generic records.
+We parse the Avro schema based on the JSON string. There are many other ways
to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer
to [Avro
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
for details.
+After that, we create an `AvroParquetRecordFormat` via `AvroParquetReaders`
for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we monitor for the new
files every second
+to read Avro Generic records from Parquet files infinitely as new files are
added to the directory.
+
+```java
+// parsing avro schema
+final Schema schema =
+ new Schema.Parser()
+ .parse(
+ "{\"type\": \"record\", "
+ + "\"name\": \"User\", "
+ + "\"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"
},\n"
+ + " {\"name\": \"favoriteNumber\", \"type\":
[\"int\", \"null\"] },\n"
+ + " {\"name\": \"favoriteColor\", \"type\":
[\"string\", \"null\"] }\n"
+ + " ]\n"
+ + " }");
+
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, we can generate classes by leveraging
Avro code generation.
+Once the classes have been generated, there is no need to use the schema
directly in our programs.
+We can either use avro-tools.jar to generate code manually or use Avro Maven
plugin to performs
+code generation on any .avsc files present in the configured source
directory.Please refer to
+[Avro Getting
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more
information.
+
+#### Bounded read example
+
+In this example, we use the example schema
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+ {"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {"name": "num", "type": "int"},
+ {"name": "street", "type": "string"},
+ {"name": "city", "type": "string"},
+ {"name": "state", "type": "string"},
+ {"name": "zip", "type": "string"}
+ ]
+ }
+]
+```
+
+We use Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ // generated code...
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Specific record
+and then create a DataStream containing Parquet records as Avor Specific
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same generated
Address Java class
+and monitor for the new files every second to read Avro Specific records from
Parquet files
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forSpecificRecord(Address.class), /* Flink
Path */)
+ .monitorContinuously(Duration.ofSeconds(1L))
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro
schema,
+Flink also supports creating a Datastream from Parquet files based on existing
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) for more
details.
+
+#### Bounded read example
+
+In this example, we use a simple Java POJO class
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
+
+```java
+public class Datum implements Serializable {
+
+ public String a;
+ public int b;
+
+ public Datum() {}
+
+ public Datum(String a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Datum datum = (Datum) o;
+ return b == datum.b && (a != null ? a.equals(datum.a) : datum.a ==
null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = a != null ? a.hashCode() : 0;
+ result = 31 * result + b;
+ return result;
+ }
+}
+```
+
+We create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro
Reflect record
+and then create a DataStream containing Parquet records as Avor Reflect
records.
+
+```java
+final FileSource<GenericRecord> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forReflectRecord(Datum.class), /* Flink
Path */)
+ .build();
+
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(10L);
+
+final DataStream<GenericRecord> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"file-source");
+```
+
+#### Continuous read example
+
+In this example, similar to the bound read example, we use the same POJO Java
class `Datum`
+and monitor for the new files every second to read Avro Reflect records from
Parquet files
+infinitely as new files are added to the directory.
Review comment:
```suggestion
This example, similar to the bounded data example, uses the same POJO Java
class `Datum`
and monitors for the new files every second to read Avro Reflect records
from Parquet files
infinitely as new files are added to the directory.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]