AHeise commented on a change in pull request #17501:
URL: https://github.com/apache/flink/pull/17501#discussion_r770496071
##########
File path:
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
##########
@@ -157,6 +166,88 @@
long splitEnd)
throws IOException;
+ /**
+ * Creates a new reader to read in this format. This method is called when
a fresh reader is
+ * created for a split that was assigned from the enumerator. This method
may also be called on
+ * recovery from a checkpoint, if the reader never stored an offset in the
checkpoint (see
+ * {@link #restoreReader(Configuration, Path, long, long, long)} for
details.
+ *
+ * <p>Provide the default implementation, subclasses are therefore not
forced to implement it.
+ * Compare to the {@link #createReader(Configuration, FSDataInputStream,
long, long)}, This
+ * method put the focus on the {@link Path}. The default implementation
adapts information given
+ * by method arguments to {@link FSDataInputStream} and calls {@link
+ * #createReader(Configuration, FSDataInputStream, long, long)}.
+ *
+ * <p>If the format is {@link #isSplittable() splittable}, then the {@code
inputStream} is
+ * positioned to the beginning of the file split, otherwise it will be at
position zero.
+ */
+ default StreamFormat.Reader<T> createReader(
+ Configuration config, Path filePath, long splitOffset, long
splitLength)
+ throws IOException {
Review comment:
I'm still not sure if these changes provide any benefit to the user. If
the framework uses them (I currently don't find any caller), we can also move
them call-site. We should only add things to user facing interfaces if they
help the user (and not the framework).
##########
File path:
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
##########
@@ -46,7 +46,8 @@
*/
Review comment:
Commit message: [FLINK-21406][parquet] Rename Parquet**_Avro_**Writers
to AvroParquetWriters
##########
File path:
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
##########
@@ -46,7 +46,8 @@
*/
public static <T extends SpecificRecordBase> AvroParquetRecordFormat<T>
forSpecificRecord(
final Class<T> typeClass) {
- return new AvroParquetRecordFormat<>(new AvroTypeInfo<>(typeClass),
SpecificData.get());
+ return new AvroParquetRecordFormat<>(
Review comment:
These changes should be in the previous commit.
##########
File path:
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
##########
@@ -50,11 +51,12 @@
private final TypeInformation<E> type;
- private final GenericData dataModel;
+ private final SerializableSupplier<GenericData> dataModelSupplier;
- AvroParquetRecordFormat(TypeInformation<E> type, GenericData dataModel) {
Review comment:
These changes should be in the previous commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]