MartijnVisser commented on a change in pull request #18660: URL: https://github.com/apache/flink/pull/18660#discussion_r802629436
########## File path: docs/content/docs/connectors/datastream/formats/parquet.md ########## @@ -100,3 +103,248 @@ final FileSource<RowData> source = final DataStream<RowData> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` + +## Avro Records + +Flink supports producing three types of Avro records by reading Parquet files: + +- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html) +- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html) +- [Reflect record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html) + +### Generic record + +Avro schemas are defined using JSON. You can get more information about Avro schemas and types from the [Avro specification](https://avro.apache.org/docs/1.10.0/spec.html). +This example uses an Avro schema example similar to the one described in the [official Avro tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html): + +```json lines +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favoriteNumber", "type": ["int", "null"]}, + {"name": "favoriteColor", "type": ["string", "null"]} + ] +} +``` + +This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for how to define an Avro schema. + +#### Bounded data example + +In this example, you will create a DataStream containing Parquet records as Avro Generic records. +It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) for details. +After that, you will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Generic records. + +```java +// parsing avro schema +final Schema schema = + new Schema.Parser() + .parse( + "{\"type\": \"record\", " + + "\"name\": \"User\", " + + "\"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\" },\n" + + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" + + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n" + + " ]\n" + + " }"); + +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forGenericRecord(schema), /* Flink Path */) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +#### Unbounded data example + +This example is similar to the bounded batch example. The application monitors for new files every second +and reads Avro Generic records from Parquet files infinitely as new files are added to the directory. +```java +// parsing avro schema +final Schema schema = + new Schema.Parser() + .parse( + "{\"type\": \"record\", " + + "\"name\": \"User\", " + + "\"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\" },\n" + + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" + + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n" + + " ]\n" + + " }"); + +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forGenericRecord(schema), /* Flink Path */) + .monitorContinuously(Duration.ofSeconds(1L)) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +### Specific record + +Based on the previously defined schema, you can generate classes by leveraging Avro code generation. +Once the classes have been generated, there is no need to use the schema directly in your programs. +You can either use `avro-tools.jar` to generate code manually or you could use the Avro Maven plugin to perform +code generation on any .avsc files present in the configured source directory. Please refer to +[Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more information. + +#### Bounded data example + +This example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc): + +```json lines +[ + {"namespace": "org.apache.flink.formats.parquet.generated", + "type": "record", + "name": "Address", + "fields": [ + {"name": "num", "type": "int"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"} + ] + } +] +``` + +You will use the Avro Maven plugin to generate the `Address` Java class: + +```java [email protected] +public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + // generated code... +} +``` + +You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Specific record +and then create a DataStream containing Parquet records as Avro Specific records. + +```java +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +#### Unbounded data example + +This example, similar to the bound batch example, uses the same generated Address Java class +and monitors for the new files every second to read Avro Specific records from Parquet files +infinitely as new files are added to the directory. + +```java +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */) + .monitorContinuously(Duration.ofSeconds(1L)) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +### Reflect record + +Beyond Avro Generic and Specific record that requires a predefined Avro schema, +Flink also supports creating a DataStream from Parquet files based on existing Java POJO classes. +In this case, Avro will use Java reflection to generate schemas and protocols for these POJO classes. +Java types are mapped to Avro schemas, please refer to the [Avro reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation for more details. + +#### Bounded data example + +This example uses a simple Java POJO class [Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java): + +```java +public class Datum implements Serializable { + + public String a; + public int b; + + public Datum() {} + + public Datum(String a, int b) { + this.a = a; + this.b = b; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Datum datum = (Datum) o; + return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null); + } + + @Override + public int hashCode() { + int result = a != null ? a.hashCode() : 0; + result = 31 * result + b; + return result; + } +} +``` + +You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Reflect record +and then create a DataStream containing Parquet records as Avro Reflect records. + +```java +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forReflectRecord(Datum.class), /* Flink Path */) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +#### Unbounded data example + +This example, similar to the bound batch example, uses the same POJO Java class `Datum` Review comment: ```suggestion This example, similar to the bounded batch example, uses the same POJO Java class `Datum` ``` ########## File path: docs/content/docs/connectors/datastream/formats/parquet.md ########## @@ -100,3 +103,248 @@ final FileSource<RowData> source = final DataStream<RowData> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); ``` + +## Avro Records + +Flink supports producing three types of Avro records by reading Parquet files: + +- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html) +- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html) +- [Reflect record](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/reflect/package-summary.html) + +### Generic record + +Avro schemas are defined using JSON. You can get more information about Avro schemas and types from the [Avro specification](https://avro.apache.org/docs/1.10.0/spec.html). +This example uses an Avro schema example similar to the one described in the [official Avro tutorial](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html): + +```json lines +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favoriteNumber", "type": ["int", "null"]}, + {"name": "favoriteColor", "type": ["string", "null"]} + ] +} +``` + +This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for how to define an Avro schema. + +#### Bounded data example + +In this example, you will create a DataStream containing Parquet records as Avro Generic records. +It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) for details. +After that, you will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Generic records. + +```java +// parsing avro schema +final Schema schema = + new Schema.Parser() + .parse( + "{\"type\": \"record\", " + + "\"name\": \"User\", " + + "\"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\" },\n" + + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" + + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n" + + " ]\n" + + " }"); + +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forGenericRecord(schema), /* Flink Path */) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +#### Unbounded data example + +This example is similar to the bounded batch example. The application monitors for new files every second +and reads Avro Generic records from Parquet files infinitely as new files are added to the directory. +```java +// parsing avro schema +final Schema schema = + new Schema.Parser() + .parse( + "{\"type\": \"record\", " + + "\"name\": \"User\", " + + "\"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\" },\n" + + " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n" + + " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n" + + " ]\n" + + " }"); + +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forGenericRecord(schema), /* Flink Path */) + .monitorContinuously(Duration.ofSeconds(1L)) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +### Specific record + +Based on the previously defined schema, you can generate classes by leveraging Avro code generation. +Once the classes have been generated, there is no need to use the schema directly in your programs. +You can either use `avro-tools.jar` to generate code manually or you could use the Avro Maven plugin to perform +code generation on any .avsc files present in the configured source directory. Please refer to +[Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more information. + +#### Bounded data example + +This example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc): + +```json lines +[ + {"namespace": "org.apache.flink.formats.parquet.generated", + "type": "record", + "name": "Address", + "fields": [ + {"name": "num", "type": "int"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"} + ] + } +] +``` + +You will use the Avro Maven plugin to generate the `Address` Java class: + +```java [email protected] +public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + // generated code... +} +``` + +You will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Specific record +and then create a DataStream containing Parquet records as Avro Specific records. + +```java +final FileSource<GenericRecord> source = + FileSource.forRecordStreamFormat( + AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */) + .build(); + +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10L); + +final DataStream<GenericRecord> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +``` + +#### Unbounded data example + +This example, similar to the bound batch example, uses the same generated Address Java class Review comment: ```suggestion This example, similar to the bounded batch example, uses the same generated Address Java class ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
