MartijnVisser commented on a change in pull request #19083:
URL: https://github.com/apache/flink/pull/19083#discussion_r826047567



##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -30,7 +30,7 @@ under the License.
 
 Flink supports reading [Parquet](https://parquet.apache.org/) files, 
 producing {{< javadoc file="org/apache/flink/table/data/RowData.html" 
name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records.
-To use the format you need to add the Flink Parquet dependency to your project:
+To use the format you need to add the flink-parquet dependency to your project 
for reading Flink RowData:

Review comment:
       ```suggestion
   To use the format you need to add the `flink-parquet` dependency to your 
project:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>

Review comment:
       Are these exclusions only needed when using this in combination with 
`flink-parquet` ? Or can we already exclude these by default?

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch 
and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-## Flink RowData
+By default, a File Source is created in the bounded mode, to turn the source 
into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
additionally .
 
-#### Bounded data example
+**Batch mode**
 
-In this example, you will create a DataStream containing Parquet records as 
Flink RowDatas. The schema is projected to read only the specified fields 
("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter 
specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields 
names are case-sensitive.
-There is no watermark strategy defined as records do not contain event 
timestamps.
+```java
+
+// reads bounded data of records from files at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        
+// reads unbounded data of records from files by monitoring the Paths
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+```
+
+**Streaming mode** 
 
 ```java
-final LogicalType[] fieldTypes =
-  new LogicalType[] {
-  new DoubleType(), new IntType(), new VarCharType()
-  };
 
-final ParquetColumnarRowInputFormat<FileSourceSplit> format =
-  new ParquetColumnarRowInputFormat<>(
-  new Configuration(),
-  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
-  500,
-  false,
-  true);
-final FileSource<RowData> source =
-  FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .build();
-final DataStream<RowData> stream =
-  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+// reads bounded data of records from files at a time
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads unbounded data of records from files by monitoring the Paths
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+
 ```
 
-#### Unbounded data example
+From now on, this document will only show you examples for bounded data. You 
can add a call of

Review comment:
       If we only show examples for either bounded or unbounded data, I think I 
would prefer to have the examples show unbounded data and explain how you could 
apply them to bounded data. Since batch is a special case of streaming from a 
Flink perspective.

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -329,22 +296,94 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
 ```
 
-#### Unbounded data example
+#### Prerequisite of Parquet files
+
+In order to support reading Avro Reflect records, the parquet file must 
contain specific meta information.
+The Avro schema used for creating the parquet data must contain a `namespace`, 
+which will be used by the program to identify the concrete Java class for the 
reflection process.
+
+The following example shows the User schema used previously. But this time it 
contains a namespace 
+pointing to the location(in this case the package), where the User class for 
the reflection could be found.
+
+```java
+// avro schema with namespace
+final String schema = 
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"namespace\": 
\"org.apache.flink.formats.parquet.avro\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" 
},\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": 
[\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": 
[\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }";
+
+```
+
+Parquet files created with this schema will contain meta information like:
 
-This example, similar to the bounded batch example, uses the same POJO Java 
class `Datum`
-and monitors for the new files every second to read Avro Reflect records from 
Parquet files
-infinitely as new files are added to the directory.
+```text
+creator:        parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
+extra:          parquet.avro.schema =
+{"type":"record","name":"User","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
+extra:          writer.model.name = avro
+
+file schema:    org.apache.flink.formats.parquet.avro.User
+--------------------------------------------------------------------------------
+name:           REQUIRED BINARY L:STRING R:0 D:0
+favoriteNumber: OPTIONAL INT32 R:0 D:1
+favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1
+
+row group 1:    RC:3 TS:143 OFFSET:4
+--------------------------------------------------------------------------------
+name:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
+favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
+favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
+
+```
+
+With the User class defined in the package 
org.apache.flink.formats.parquet.avro:

Review comment:
       ```suggestion
   With the `User` class defined in the package 
org.apache.flink.formats.parquet.avro:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch 
and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-## Flink RowData
+By default, a File Source is created in the bounded mode, to turn the source 
into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
additionally .
 
-#### Bounded data example
+**Batch mode**

Review comment:
       I think Flink still uses Bounded and Unbounded data. Batch mode is 
referring to the execution mode

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:

Review comment:
       ```suggestion
   To read Avro records, you will need to add the `parquet-avro` dependency:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch 
and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-## Flink RowData
+By default, a File Source is created in the bounded mode, to turn the source 
into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
additionally .
 
-#### Bounded data example
+**Batch mode**
 
-In this example, you will create a DataStream containing Parquet records as 
Flink RowDatas. The schema is projected to read only the specified fields 
("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter 
specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields 
names are case-sensitive.
-There is no watermark strategy defined as records do not contain event 
timestamps.
+```java
+
+// reads bounded data of records from files at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        
+// reads unbounded data of records from files by monitoring the Paths
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+```
+
+**Streaming mode** 

Review comment:
       ```suggestion
   **Unbounded data** 
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -329,22 +296,94 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"file-source");
 ```
 
-#### Unbounded data example
+#### Prerequisite of Parquet files
+
+In order to support reading Avro Reflect records, the parquet file must 
contain specific meta information.
+The Avro schema used for creating the parquet data must contain a `namespace`, 
+which will be used by the program to identify the concrete Java class for the 
reflection process.
+
+The following example shows the User schema used previously. But this time it 
contains a namespace 
+pointing to the location(in this case the package), where the User class for 
the reflection could be found.

Review comment:
       ```suggestion
   In order to support reading Avro reflect records, the Parquet file must 
contain specific meta information.
   The Avro schema used for creating the Parquet data must contain a 
`namespace`, 
   which will be used by the program to identify the concrete Java class for 
the reflection process.
   
   The following example shows the `User` schema used previously. But this time 
it contains a namespace 
   pointing to the location(in this case the package), where the `User` class 
for the reflection could be found.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to