MartijnVisser commented on a change in pull request #18660:
URL: https://github.com/apache/flink/pull/18660#discussion_r802629436



##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +103,248 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading Parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+This example uses an Avro schema example similar to the one described in the 
[official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded data example
+
+In this example, you will create a DataStream containing Parquet records as 
Avro Generic records. 
+It will parse the Avro schema based on the JSON string. There are many other 
ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please 
refer to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, you will create an `AvroParquetRecordFormat` via 
`AvroParquetReaders` for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Unbounded data example
+
+This example is similar to the bounded batch example. The application monitors 
for new files every second 
+and reads Avro Generic records from Parquet files infinitely as new files are 
added to the directory.
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, you can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in your programs. 
+You can either use `avro-tools.jar` to generate code manually or you could use 
the Avro Maven plugin to perform 
+code generation on any .avsc files present in the configured source directory. 
Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded data example
+
+This example uses the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+You will use the Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avro Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Unbounded data example
+
+This example, similar to the bound batch example, uses the same generated 
Address Java class 
+and monitors for the new files every second to read Avro Specific records from 
Parquet files 
+infinitely as new files are added to the directory.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Reflect record
+
+Beyond Avro Generic and Specific record that requires a predefined Avro 
schema, 
+Flink also supports creating a DataStream from Parquet files based on existing 
Java POJO classes.
+In this case, Avro will use Java reflection to generate schemas and protocols 
for these POJO classes.
+Java types are mapped to Avro schemas, please refer to the [Avro 
reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation 
for more details.
+
+#### Bounded data example
+
+This example uses a simple Java POJO class 
[Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
+
+```java
+public class Datum implements Serializable {
+
+    public String a;
+    public int b;
+
+    public Datum() {}
+
+    public Datum(String a, int b) {
+        this.a = a;
+        this.b = b;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Datum datum = (Datum) o;
+        return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == 
null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = a != null ? a.hashCode() : 0;
+        result = 31 * result + b;
+        return result;
+    }
+}
+```
+
+You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Reflect record
+and then create a DataStream containing Parquet records as Avro Reflect 
records.
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forReflectRecord(Datum.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Unbounded data example
+
+This example, similar to the bound batch example, uses the same POJO Java 
class `Datum`

Review comment:
       ```suggestion
   This example, similar to the bounded batch example, uses the same POJO Java 
class `Datum`
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -100,3 +103,248 @@ final FileSource<RowData> source =
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
+
+## Avro Records
+
+Flink supports producing three types of Avro records by reading Parquet files:
+
+- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
+- [Reflect 
record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html)
+
+### Generic record
+
+Avro schemas are defined using JSON. You can get more information about Avro 
schemas and types from the [Avro 
specification](https://avro.apache.org/docs/1.10.0/spec.html).
+This example uses an Avro schema example similar to the one described in the 
[official Avro 
tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html):
+
+```json lines
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favoriteNumber",  "type": ["int", "null"]},
+     {"name": "favoriteColor", "type": ["string", "null"]}
+ ]
+}
+```
+
+This schema defines a record representing a user with three fields: name, 
favoriteNumber, and favoriteColor. You can find more details at [record 
specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for 
how to define an Avro schema.
+
+#### Bounded data example
+
+In this example, you will create a DataStream containing Parquet records as 
Avro Generic records. 
+It will parse the Avro schema based on the JSON string. There are many other 
ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please 
refer to [Avro 
Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html)
 for details.
+After that, you will create an `AvroParquetRecordFormat` via 
`AvroParquetReaders` for Avro Generic records.
+
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Unbounded data example
+
+This example is similar to the bounded batch example. The application monitors 
for new files every second 
+and reads Avro Generic records from Parquet files infinitely as new files are 
added to the directory.
+```java
+// parsing avro schema
+final Schema schema =
+        new Schema.Parser()
+            .parse(
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }");
+
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
+        .monitorContinuously(Duration.ofSeconds(1L))
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+### Specific record
+
+Based on the previously defined schema, you can generate classes by leveraging 
Avro code generation. 
+Once the classes have been generated, there is no need to use the schema 
directly in your programs. 
+You can either use `avro-tools.jar` to generate code manually or you could use 
the Avro Maven plugin to perform 
+code generation on any .avsc files present in the configured source directory. 
Please refer to 
+[Avro Getting 
Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more 
information.
+
+#### Bounded data example
+
+This example uses the example schema 
[testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+
+```json lines
+[
+  {"namespace": "org.apache.flink.formats.parquet.generated",
+    "type": "record",
+    "name": "Address",
+    "fields": [
+      {"name": "num", "type": "int"},
+      {"name": "street", "type": "string"},
+      {"name": "city", "type": "string"},
+      {"name": "state", "type": "string"},
+      {"name": "zip", "type": "string"}
+    ]
+  }
+]
+```
+
+You will use the Avro Maven plugin to generate the `Address` Java class:
+
+```java
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+    // generated code...
+}
+```
+
+You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro 
Specific record 
+and then create a DataStream containing Parquet records as Avro Specific 
records. 
+
+```java
+final FileSource<GenericRecord> source =
+        FileSource.forRecordStreamFormat(
+                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink 
Path */)
+        .build();
+
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10L);
+        
+final DataStream<GenericRecord> stream =
+        env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
+```
+
+#### Unbounded data example
+
+This example, similar to the bound batch example, uses the same generated 
Address Java class 

Review comment:
       ```suggestion
   This example, similar to the bounded batch example, uses the same generated 
Address Java class 
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to