Polber commented on code in PR #30129:
URL: https://github.com/apache/beam/pull/30129#discussion_r1469999801


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java:
##########
@@ -211,6 +212,24 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                 "The input schema must have exactly one field of type byte.");
           }
           toBytesFn = 
getRowToRawBytesFunction(inputSchema.getField(0).getName());
+        } else if (configuration.getFormat().equals("PROTO")) {
+          String descriptorPath = configuration.getFileDescriptorPath();
+          String schema = configuration.getSchema();
+          String messageName = configuration.getMessageName();
+          if (messageName == null) {
+            throw new IllegalArgumentException("Expecting messageName to be 
non-null.");
+          }

Review Comment:
   Same here



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java:
##########
@@ -471,4 +534,31 @@ public void testPubsubLiteErrorFnWithDedupingSuccess() {
 
     p.run().waitUntilFinish();
   }
+
+  @Test
+  public void testPubSubLiteErrorFnReadProto() {

Review Comment:
   I think there should either be an error row that is run through the pipeline 
to check the error message count, or have a separate failure test case.



##########
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java:
##########


Review Comment:
   Seems like a lot of these methods are for reading from a file (GCS or 
local), which is already part of the Filesystems library. i.e. (copied from a 
similar 
[method](https://github.com/Polber/beam/blob/f5f969bd6e50486068f3f73d323150fa377a9ffd/examples/java/src/main/java/org/apache/beam/examples/complete/kafkatopubsub/kafka/consumer/SslConsumerFactoryFn.java#L121))
   
   ```
   private static byte[] getFileAsBytes(String fileDescriptorPath) {
     try (ReadableByteChannel readerChannel =
         
FileSystems.open(FileSystems.matchSingleFileSpec(fileDescriptorPath).resourceId()))
 
       ReadableByteChannel decompressedChannel = 
compression.readDecompressed(readerChannel)
       try (InputStream stream = Channels.newInputStream(decompressedChannel)) {
           return StreamUtils.getBytesWithoutClosing(stream);
       }
     } catch (IOException e) {
       throw new RuntimeException("Error when finding: " + fileDescriptorPath, 
e);
     }
   }
       
   ```



##########
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java:
##########


Review Comment:
   Probably also removes need for the `com.google.cloud:google-cloud-storage` 
in the `build.gradle`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java:
##########
@@ -214,6 +215,26 @@ public void finish(FinishBundleContext c) {
       beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
       valueMapper = getRawBytesToRowFunction(beamSchema);
 
+    } else if (format != null && format.equals("PROTO")) {
+      String fileDescriptorPath = configuration.getFileDescriptorPath();
+      String messageName = configuration.getMessageName();
+
+      if (messageName == null) {
+        throw new IllegalArgumentException(
+            "To read from PubSubLite in PROTO format, messageName must be 
provided.");
+      }

Review Comment:
   nit: might make sense to have a `validate()` method in the Configuration 
class that can have checks like this. (and could move the other checks like the 
one for `inputSchema` below)



##########
sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java:
##########
@@ -144,4 +172,23 @@ public void testRowToProtoSchemaFunction() {
     Assert.assertNotNull(
         ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, 
"MyMessage").apply(row));
   }
+
+  @Test
+  public void testRowToProtoSchemaWithPackageFunction() {
+    Row row =
+        Row.withSchema(SCHEMA)
+            .withFieldValue("id", 1234)
+            .withFieldValue("name", "Doe")
+            .withFieldValue("active", false)
+            .withFieldValue("address.city", "seattle")
+            .withFieldValue("address.street", "fake street")
+            .withFieldValue("address.zip_code", "TO-1234")
+            .withFieldValue("address.state", "wa")
+            .build();
+
+    Assert.assertNotNull(
+        ProtoByteUtils.getRowToProtoBytesFromSchema(
+                PROTO_STRING_PACKAGE_SCHEMA, "com.test.proto.MyMessage")
+            .apply(row));

Review Comment:
   I think having at least 1 test case for `getRowToProtoBytesFromSchema` that 
validates the output proto after invoking `.apply()` would be beneficial 
(whether it be this test or one of the existing ones)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to