chamikaramj commented on code in PR #28782:
URL: https://github.com/apache/beam/pull/28782#discussion_r1357763468


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -89,38 +87,43 @@ public SchemaTransform 
from(PubsubReadSchemaTransformConfiguration configuration
           "To read from Pubsub, a subscription name or a topic name must be 
provided. Not both.");
     }
 
-    if ((Strings.isNullOrEmpty(configuration.getSchema())
-            && !Strings.isNullOrEmpty(configuration.getFormat()))
-        || (!Strings.isNullOrEmpty(configuration.getSchema())
-            && Strings.isNullOrEmpty(configuration.getFormat()))) {
-      throw new IllegalArgumentException(
-          "A schema was provided without a data format (or viceversa). Please 
provide "
-              + "both of these parameters to read from Pubsub, or if you would 
like to use the Pubsub schema service,"
-              + " please leave both of these blank.");
+    if (!"RAW".equals(configuration.getFormat())) {
+      if ((Strings.isNullOrEmpty(configuration.getSchema())
+              && !Strings.isNullOrEmpty(configuration.getFormat()))
+          || (!Strings.isNullOrEmpty(configuration.getSchema())
+              && Strings.isNullOrEmpty(configuration.getFormat()))) {
+        throw new IllegalArgumentException(
+            "A schema was provided without a data format (or viceversa). 
Please provide "
+                + "both of these parameters to read from Pubsub, or if you 
would like to use the Pubsub schema service,"
+                + " please leave both of these blank.");
+      }
     }
 
-    Schema beamSchema;
-    SerializableFunction<byte[], Row> valueMapper;
+    Schema payloadSchema;
+    SerializableFunction<byte[], Row> payloadMapper;
 
-    if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+    String format =
+        configuration.getFormat() == null ? null : 
configuration.getFormat().toUpperCase();
+    if (Objects.equals(format, "RAW")) {

Review Comment:
   Just do `if ("RAW".equals(format))` ? (and similarly below)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -135,45 +138,103 @@ public SchemaTransform 
from(PubsubReadSchemaTransformConfiguration configuration
   private static class PubsubReadSchemaTransform extends SchemaTransform 
implements Serializable {
     final Schema beamSchema;
     final SerializableFunction<byte[], Row> valueMapper;
-    final @Nullable String topic;
-    final @Nullable String subscription;
+    final PubsubReadSchemaTransformConfiguration configuration;
     @Nullable PubsubTestClientFactory clientFactory;
     @Nullable Clock clock;
 
     PubsubReadSchemaTransform(
-        @Nullable String topic,
-        @Nullable String subscription,
-        Schema beamSchema,
+        PubsubReadSchemaTransformConfiguration configuration,
+        Schema payloadSchema,
         SerializableFunction<byte[], Row> valueMapper) {
-      this.topic = topic;
-      this.subscription = subscription;
-      this.beamSchema = beamSchema;
+      this.configuration = configuration;
+      Schema outputSchema;
+      List<String> attributes = configuration.getAttributes();
+      String attributesMap = configuration.getAttributesMap();
+      if (attributes == null && attributesMap == null) {
+        outputSchema = payloadSchema;
+      } else {
+        Schema.Builder outputSchemaBuilder = Schema.builder();
+        outputSchemaBuilder.addFields(payloadSchema.getFields());
+        if (attributes != null) {
+          for (String attribute : attributes) {
+            outputSchemaBuilder.addStringField(attribute);
+          }
+        }
+        if (attributesMap != null) {
+          outputSchemaBuilder.addMapField(
+              attributesMap, Schema.FieldType.STRING, Schema.FieldType.STRING);
+        }
+        outputSchema = outputSchemaBuilder.build();
+      }
+      this.beamSchema = outputSchema;
       this.valueMapper = valueMapper;
     }
 
     private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> {
-      private Counter pubsubErrorCounter;
+      private final Counter pubsubErrorCounter;
       private Long errorsInBundle = 0L;
-      private SerializableFunction<byte[], Row> valueMapper;
+      private final SerializableFunction<byte[], Row> valueMapper;
+      private final @Nullable List<String> attributes;
+      private final @Nullable String attributesMap;
+      private final Schema outputSchema;
+
+      final boolean useErrorOutput;
 
-      ErrorCounterFn(String name, SerializableFunction<byte[], Row> 
valueMapper) {
+      ErrorCounterFn(
+          String name,
+          SerializableFunction<byte[], Row> valueMapper,
+          @Nullable List<String> attributes,
+          @Nullable String attributesMap,
+          Schema outputSchema,
+          boolean useErrorOutput) {
         this.pubsubErrorCounter = 
Metrics.counter(PubsubReadSchemaTransformProvider.class, name);
         this.valueMapper = valueMapper;
+        this.attributes = attributes;
+        this.attributesMap = attributesMap;
+        this.outputSchema = outputSchema;
+        this.useErrorOutput = useErrorOutput;
       }
 
       @ProcessElement
-      public void process(@DoFn.Element PubsubMessage message, 
MultiOutputReceiver receiver) {
+      public void process(@DoFn.Element PubsubMessage message, 
MultiOutputReceiver receiver)
+          throws Exception {
 
         try {
-          
receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload()));
+          Row payloadRow = valueMapper.apply(message.getPayload());
+          Row outputRow;
+          if (attributes == null && attributesMap == null) {
+            outputRow = payloadRow;
+          } else {
+            Row.Builder rowBuilder = Row.withSchema(outputSchema);
+            List<@Nullable Object> payloadValues = payloadRow.getValues();
+            if (payloadValues != null) {
+              rowBuilder.addValues(payloadValues);
+            }
+            if (attributes != null) {
+              for (String attribute : attributes) {
+                System.out.println(

Review Comment:
   Pls delete the print line.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java:
##########
@@ -66,68 +73,178 @@ public Class<PubsubWriteSchemaTransformConfiguration> 
configurationClass() {
   }
 
   public static class ErrorFn extends DoFn<Row, PubsubMessage> {
-    private SerializableFunction<Row, byte[]> valueMapper;
-    private Schema errorSchema;
+    private final SerializableFunction<Row, byte[]> valueMapper;
+    private final @Nullable Set<String> attributes;
+    private final @Nullable String attributesMap;
+    private final Schema payloadSchema;
+    private final Schema errorSchema;
+    private final boolean useErrorOutput;
 
-    ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema) 
{
+    ErrorFn(
+        SerializableFunction<Row, byte[]> valueMapper,
+        @Nullable List<String> attributes,
+        @Nullable String attributesMap,
+        Schema payloadSchema,
+        Schema errorSchema,
+        boolean useErrorOutput) {
       this.valueMapper = valueMapper;
+      this.attributes = attributes == null ? null : 
ImmutableSet.copyOf(attributes);
+      this.attributesMap = attributesMap;
+      this.payloadSchema = payloadSchema;
       this.errorSchema = errorSchema;
+      this.useErrorOutput = useErrorOutput;
     }
 
     @ProcessElement
-    public void processElement(@Element Row row, MultiOutputReceiver receiver) 
{
+    public void processElement(@Element Row row, MultiOutputReceiver receiver) 
throws Exception {
       try {
-        receiver.get(OUTPUT_TAG).output(new 
PubsubMessage(valueMapper.apply(row), null));
-      } catch (Exception e) {
+        Row payloadRow;
+        Map<String, String> messageAttributes = null;
+        if (attributes == null && attributesMap == null) {
+          payloadRow = row;
+        } else {
+          Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+          messageAttributes = new HashMap<>();
+          List<Schema.Field> fields = row.getSchema().getFields();
+          for (int ix = 0; ix < fields.size(); ix++) {
+            String name = fields.get(ix).getName();
+            if (attributes != null && attributes.contains(name)) {
+              messageAttributes.put(name, row.getValue(ix));
+            } else if (name.equals(attributesMap)) {
+              Map<String, String> attrs = row.<String, String>getMap(ix);
+              if (attrs != null) {
+                messageAttributes.putAll(attrs);
+              }
+            } else {
+              payloadRowBuilder.addValue(row.getValue(ix));
+            }
+          }
+          payloadRow = payloadRowBuilder.build();
+        }
         receiver
-            .get(ERROR_TAG)
-            .output(Row.withSchema(errorSchema).addValues(e.toString(), 
row).build());
+            .get(OUTPUT_TAG)
+            .output(new PubsubMessage(valueMapper.apply(payloadRow), 
messageAttributes));
+      } catch (Exception e) {
+        if (useErrorOutput) {
+          receiver
+              .get(ERROR_TAG)
+              .output(Row.withSchema(errorSchema).addValues(e.toString(), 
row).build());
+        } else {
+          throw e;
+        }
       }
     }
   }
 
   @Override
   public SchemaTransform from(PubsubWriteSchemaTransformConfiguration 
configuration) {
-    if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+    if (!VALID_DATA_FORMATS.contains(configuration.getFormat().toUpperCase())) 
{
       throw new IllegalArgumentException(
           String.format(
               "Format %s not supported. Only supported formats are %s",
               configuration.getFormat(), VALID_FORMATS_STR));
     }
-    return new PubsubWriteSchemaTransform(configuration.getTopic(), 
configuration.getFormat());
+    return new PubsubWriteSchemaTransform(configuration);
   }
 
   private static class PubsubWriteSchemaTransform extends SchemaTransform 
implements Serializable {
-    final String topic;
-    final String format;
+    final PubsubWriteSchemaTransformConfiguration configuration;
 
-    PubsubWriteSchemaTransform(String topic, String format) {
-      this.topic = topic;
-      this.format = format;
+    PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
     }
 
     @Override
+    @SuppressWarnings({
+      "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   Can we fix nullness errors and remove this suppression ?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java:
##########
@@ -268,6 +364,7 @@ private static PubsubClient.IncomingMessage 
incomingMessageOf(
 
   private static PubsubTestClient.PubsubTestClientFactory clientFactory(
       List<PubsubClient.IncomingMessage> messages) {
+    System.out.println("messages " + messages);

Review Comment:
   Ditto.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -207,26 +271,47 @@ PubsubIO.Read<PubsubMessage> buildPubsubRead() {
         throw new IllegalArgumentException(
             "Both PubsubTestClientFactory and Clock need to be specified for 
testing, but only one is provided");
       }
+      if (!Strings.isNullOrEmpty(configuration.getIdAttribute())) {
+        pubsubRead = 
pubsubRead.withIdAttribute(configuration.getIdAttribute());
+      }
+      if (!Strings.isNullOrEmpty(configuration.getTimestampAttribute())) {
+        pubsubRead = 
pubsubRead.withTimestampAttribute(configuration.getTimestampAttribute());
+      }
       return pubsubRead;
     }
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
       PubsubIO.Read<PubsubMessage> pubsubRead = buildPubsubRead();
+      @SuppressWarnings("nullness")

Review Comment:
   Is this suppression needed ?(if so probably can be removed by adding a null 
check before this line).



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java:
##########
@@ -66,68 +73,178 @@ public Class<PubsubWriteSchemaTransformConfiguration> 
configurationClass() {
   }
 
   public static class ErrorFn extends DoFn<Row, PubsubMessage> {
-    private SerializableFunction<Row, byte[]> valueMapper;
-    private Schema errorSchema;
+    private final SerializableFunction<Row, byte[]> valueMapper;
+    private final @Nullable Set<String> attributes;
+    private final @Nullable String attributesMap;
+    private final Schema payloadSchema;
+    private final Schema errorSchema;
+    private final boolean useErrorOutput;
 
-    ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema) 
{
+    ErrorFn(
+        SerializableFunction<Row, byte[]> valueMapper,
+        @Nullable List<String> attributes,
+        @Nullable String attributesMap,
+        Schema payloadSchema,
+        Schema errorSchema,
+        boolean useErrorOutput) {
       this.valueMapper = valueMapper;
+      this.attributes = attributes == null ? null : 
ImmutableSet.copyOf(attributes);
+      this.attributesMap = attributesMap;
+      this.payloadSchema = payloadSchema;
       this.errorSchema = errorSchema;
+      this.useErrorOutput = useErrorOutput;
     }
 
     @ProcessElement
-    public void processElement(@Element Row row, MultiOutputReceiver receiver) 
{
+    public void processElement(@Element Row row, MultiOutputReceiver receiver) 
throws Exception {
       try {
-        receiver.get(OUTPUT_TAG).output(new 
PubsubMessage(valueMapper.apply(row), null));
-      } catch (Exception e) {
+        Row payloadRow;
+        Map<String, String> messageAttributes = null;
+        if (attributes == null && attributesMap == null) {
+          payloadRow = row;
+        } else {
+          Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+          messageAttributes = new HashMap<>();
+          List<Schema.Field> fields = row.getSchema().getFields();
+          for (int ix = 0; ix < fields.size(); ix++) {
+            String name = fields.get(ix).getName();
+            if (attributes != null && attributes.contains(name)) {
+              messageAttributes.put(name, row.getValue(ix));
+            } else if (name.equals(attributesMap)) {
+              Map<String, String> attrs = row.<String, String>getMap(ix);
+              if (attrs != null) {
+                messageAttributes.putAll(attrs);
+              }
+            } else {
+              payloadRowBuilder.addValue(row.getValue(ix));
+            }
+          }
+          payloadRow = payloadRowBuilder.build();
+        }
         receiver
-            .get(ERROR_TAG)
-            .output(Row.withSchema(errorSchema).addValues(e.toString(), 
row).build());
+            .get(OUTPUT_TAG)
+            .output(new PubsubMessage(valueMapper.apply(payloadRow), 
messageAttributes));
+      } catch (Exception e) {
+        if (useErrorOutput) {
+          receiver
+              .get(ERROR_TAG)
+              .output(Row.withSchema(errorSchema).addValues(e.toString(), 
row).build());
+        } else {
+          throw e;
+        }
       }
     }
   }
 
   @Override
   public SchemaTransform from(PubsubWriteSchemaTransformConfiguration 
configuration) {
-    if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+    if (!VALID_DATA_FORMATS.contains(configuration.getFormat().toUpperCase())) 
{
       throw new IllegalArgumentException(
           String.format(
               "Format %s not supported. Only supported formats are %s",
               configuration.getFormat(), VALID_FORMATS_STR));
     }
-    return new PubsubWriteSchemaTransform(configuration.getTopic(), 
configuration.getFormat());
+    return new PubsubWriteSchemaTransform(configuration);
   }
 
   private static class PubsubWriteSchemaTransform extends SchemaTransform 
implements Serializable {
-    final String topic;
-    final String format;
+    final PubsubWriteSchemaTransformConfiguration configuration;
 
-    PubsubWriteSchemaTransform(String topic, String format) {
-      this.topic = topic;
-      this.format = format;
+    PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
     }
 
     @Override
+    @SuppressWarnings({
+      "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+    })
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      @SuppressWarnings("nullness")

Review Comment:
   Duplicate suppression ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to