Abacn commented on code in PR #34063:
URL: https://github.com/apache/beam/pull/34063#discussion_r1985378494
##########
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java:
##########
@@ -746,6 +780,31 @@ public void processElement(
tracker.currentRestriction().getFrom(),
tracker.currentRestriction().getTo());
Configuration conf = getConfWithModelClass();
+
+ if (useProtoReader) {
+ // Use ProtoParquetReader to read protobuf data.
+ // Derive a Hadoop Path from the file metadata. Adjust as needed.
+ Path path = new Path(file.getMetadata().resourceId().toString());
+
+ // Get the configuration and set the property using the literal.
+ Configuration conf2 = getConfWithModelClass();
+ conf2.set("parquet.proto.ignore.unknown.fields", "TRUE");
Review Comment:
What is the purpose of "ignore.unknown.fields" here? In general we should
not set unsafe configure for user if they are not aware of
##########
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java:
##########
@@ -616,6 +621,9 @@ public abstract static class ReadFiles
abstract Builder toBuilder();
+ // New: flag to indicate using ProtoParquetReader for protobuf data.
+ abstract boolean getUseProtoReader();
Review Comment:
consider using a enum type parameter (AVRO / PROTO) and defaults to AVRO?
This allows flexibility for future additions.
##########
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java:
##########
@@ -746,6 +780,31 @@ public void processElement(
tracker.currentRestriction().getFrom(),
tracker.currentRestriction().getTo());
Configuration conf = getConfWithModelClass();
+
+ if (useProtoReader) {
+ // Use ProtoParquetReader to read protobuf data.
+ // Derive a Hadoop Path from the file metadata. Adjust as needed.
+ Path path = new Path(file.getMetadata().resourceId().toString());
+
+ // Get the configuration and set the property using the literal.
+ Configuration conf2 = getConfWithModelClass();
+ conf2.set("parquet.proto.ignore.unknown.fields", "TRUE");
+
+ // Use the builder overload that takes a ReadSupport and a Path.
+ try (ParquetReader<GenericRecord> reader =
+ ProtoParquetReader.<GenericRecord>builder(new
ProtoReadSupport(), path).build()) {
+ GenericRecord message;
+ while ((message = reader.read()) != null) {
+ // Cast through Object so that parseFn (which expects
GenericRecord)
+ // can accept the DynamicMessage.
+ outputReceiver.output(parseFn.apply((GenericRecord) (Object)
message));
+ }
+ }
+
+ return; // exit after using the proto path
Review Comment:
For readability, I would recommend structure the code, e.g.
getParquetFileReader() {
switch (FORMAT) {
case AVRO: return getParquetAvroReader()
case PROTO: return getParquetProtoReader()
}
}
instead of branching in place
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]