chamikaramj commented on code in PR #28782:
URL: https://github.com/apache/beam/pull/28782#discussion_r1357763468
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -89,38 +87,43 @@ public SchemaTransform
from(PubsubReadSchemaTransformConfiguration configuration
"To read from Pubsub, a subscription name or a topic name must be
provided. Not both.");
}
- if ((Strings.isNullOrEmpty(configuration.getSchema())
- && !Strings.isNullOrEmpty(configuration.getFormat()))
- || (!Strings.isNullOrEmpty(configuration.getSchema())
- && Strings.isNullOrEmpty(configuration.getFormat()))) {
- throw new IllegalArgumentException(
- "A schema was provided without a data format (or viceversa). Please
provide "
- + "both of these parameters to read from Pubsub, or if you would
like to use the Pubsub schema service,"
- + " please leave both of these blank.");
+ if (!"RAW".equals(configuration.getFormat())) {
+ if ((Strings.isNullOrEmpty(configuration.getSchema())
+ && !Strings.isNullOrEmpty(configuration.getFormat()))
+ || (!Strings.isNullOrEmpty(configuration.getSchema())
+ && Strings.isNullOrEmpty(configuration.getFormat()))) {
+ throw new IllegalArgumentException(
+ "A schema was provided without a data format (or viceversa).
Please provide "
+ + "both of these parameters to read from Pubsub, or if you
would like to use the Pubsub schema service,"
+ + " please leave both of these blank.");
+ }
}
- Schema beamSchema;
- SerializableFunction<byte[], Row> valueMapper;
+ Schema payloadSchema;
+ SerializableFunction<byte[], Row> payloadMapper;
- if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+ String format =
+ configuration.getFormat() == null ? null :
configuration.getFormat().toUpperCase();
+ if (Objects.equals(format, "RAW")) {
Review Comment:
Just do `if ("RAW".equals(format))` ? (and similarly below)
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -135,45 +138,103 @@ public SchemaTransform
from(PubsubReadSchemaTransformConfiguration configuration
private static class PubsubReadSchemaTransform extends SchemaTransform
implements Serializable {
final Schema beamSchema;
final SerializableFunction<byte[], Row> valueMapper;
- final @Nullable String topic;
- final @Nullable String subscription;
+ final PubsubReadSchemaTransformConfiguration configuration;
@Nullable PubsubTestClientFactory clientFactory;
@Nullable Clock clock;
PubsubReadSchemaTransform(
- @Nullable String topic,
- @Nullable String subscription,
- Schema beamSchema,
+ PubsubReadSchemaTransformConfiguration configuration,
+ Schema payloadSchema,
SerializableFunction<byte[], Row> valueMapper) {
- this.topic = topic;
- this.subscription = subscription;
- this.beamSchema = beamSchema;
+ this.configuration = configuration;
+ Schema outputSchema;
+ List<String> attributes = configuration.getAttributes();
+ String attributesMap = configuration.getAttributesMap();
+ if (attributes == null && attributesMap == null) {
+ outputSchema = payloadSchema;
+ } else {
+ Schema.Builder outputSchemaBuilder = Schema.builder();
+ outputSchemaBuilder.addFields(payloadSchema.getFields());
+ if (attributes != null) {
+ for (String attribute : attributes) {
+ outputSchemaBuilder.addStringField(attribute);
+ }
+ }
+ if (attributesMap != null) {
+ outputSchemaBuilder.addMapField(
+ attributesMap, Schema.FieldType.STRING, Schema.FieldType.STRING);
+ }
+ outputSchema = outputSchemaBuilder.build();
+ }
+ this.beamSchema = outputSchema;
this.valueMapper = valueMapper;
}
private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> {
- private Counter pubsubErrorCounter;
+ private final Counter pubsubErrorCounter;
private Long errorsInBundle = 0L;
- private SerializableFunction<byte[], Row> valueMapper;
+ private final SerializableFunction<byte[], Row> valueMapper;
+ private final @Nullable List<String> attributes;
+ private final @Nullable String attributesMap;
+ private final Schema outputSchema;
+
+ final boolean useErrorOutput;
- ErrorCounterFn(String name, SerializableFunction<byte[], Row>
valueMapper) {
+ ErrorCounterFn(
+ String name,
+ SerializableFunction<byte[], Row> valueMapper,
+ @Nullable List<String> attributes,
+ @Nullable String attributesMap,
+ Schema outputSchema,
+ boolean useErrorOutput) {
this.pubsubErrorCounter =
Metrics.counter(PubsubReadSchemaTransformProvider.class, name);
this.valueMapper = valueMapper;
+ this.attributes = attributes;
+ this.attributesMap = attributesMap;
+ this.outputSchema = outputSchema;
+ this.useErrorOutput = useErrorOutput;
}
@ProcessElement
- public void process(@DoFn.Element PubsubMessage message,
MultiOutputReceiver receiver) {
+ public void process(@DoFn.Element PubsubMessage message,
MultiOutputReceiver receiver)
+ throws Exception {
try {
-
receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload()));
+ Row payloadRow = valueMapper.apply(message.getPayload());
+ Row outputRow;
+ if (attributes == null && attributesMap == null) {
+ outputRow = payloadRow;
+ } else {
+ Row.Builder rowBuilder = Row.withSchema(outputSchema);
+ List<@Nullable Object> payloadValues = payloadRow.getValues();
+ if (payloadValues != null) {
+ rowBuilder.addValues(payloadValues);
+ }
+ if (attributes != null) {
+ for (String attribute : attributes) {
+ System.out.println(
Review Comment:
Pls delete the print line.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java:
##########
@@ -66,68 +73,178 @@ public Class<PubsubWriteSchemaTransformConfiguration>
configurationClass() {
}
public static class ErrorFn extends DoFn<Row, PubsubMessage> {
- private SerializableFunction<Row, byte[]> valueMapper;
- private Schema errorSchema;
+ private final SerializableFunction<Row, byte[]> valueMapper;
+ private final @Nullable Set<String> attributes;
+ private final @Nullable String attributesMap;
+ private final Schema payloadSchema;
+ private final Schema errorSchema;
+ private final boolean useErrorOutput;
- ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema)
{
+ ErrorFn(
+ SerializableFunction<Row, byte[]> valueMapper,
+ @Nullable List<String> attributes,
+ @Nullable String attributesMap,
+ Schema payloadSchema,
+ Schema errorSchema,
+ boolean useErrorOutput) {
this.valueMapper = valueMapper;
+ this.attributes = attributes == null ? null :
ImmutableSet.copyOf(attributes);
+ this.attributesMap = attributesMap;
+ this.payloadSchema = payloadSchema;
this.errorSchema = errorSchema;
+ this.useErrorOutput = useErrorOutput;
}
@ProcessElement
- public void processElement(@Element Row row, MultiOutputReceiver receiver)
{
+ public void processElement(@Element Row row, MultiOutputReceiver receiver)
throws Exception {
try {
- receiver.get(OUTPUT_TAG).output(new
PubsubMessage(valueMapper.apply(row), null));
- } catch (Exception e) {
+ Row payloadRow;
+ Map<String, String> messageAttributes = null;
+ if (attributes == null && attributesMap == null) {
+ payloadRow = row;
+ } else {
+ Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+ messageAttributes = new HashMap<>();
+ List<Schema.Field> fields = row.getSchema().getFields();
+ for (int ix = 0; ix < fields.size(); ix++) {
+ String name = fields.get(ix).getName();
+ if (attributes != null && attributes.contains(name)) {
+ messageAttributes.put(name, row.getValue(ix));
+ } else if (name.equals(attributesMap)) {
+ Map<String, String> attrs = row.<String, String>getMap(ix);
+ if (attrs != null) {
+ messageAttributes.putAll(attrs);
+ }
+ } else {
+ payloadRowBuilder.addValue(row.getValue(ix));
+ }
+ }
+ payloadRow = payloadRowBuilder.build();
+ }
receiver
- .get(ERROR_TAG)
- .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
+ .get(OUTPUT_TAG)
+ .output(new PubsubMessage(valueMapper.apply(payloadRow),
messageAttributes));
+ } catch (Exception e) {
+ if (useErrorOutput) {
+ receiver
+ .get(ERROR_TAG)
+ .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
+ } else {
+ throw e;
+ }
}
}
}
@Override
public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
- if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat().toUpperCase()))
{
throw new IllegalArgumentException(
String.format(
"Format %s not supported. Only supported formats are %s",
configuration.getFormat(), VALID_FORMATS_STR));
}
- return new PubsubWriteSchemaTransform(configuration.getTopic(),
configuration.getFormat());
+ return new PubsubWriteSchemaTransform(configuration);
}
private static class PubsubWriteSchemaTransform extends SchemaTransform
implements Serializable {
- final String topic;
- final String format;
+ final PubsubWriteSchemaTransformConfiguration configuration;
- PubsubWriteSchemaTransform(String topic, String format) {
- this.topic = topic;
- this.format = format;
+ PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration
configuration) {
+ this.configuration = configuration;
}
@Override
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
Review Comment:
Can we fix nullness errors and remove this suppression ?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java:
##########
@@ -268,6 +364,7 @@ private static PubsubClient.IncomingMessage
incomingMessageOf(
private static PubsubTestClient.PubsubTestClientFactory clientFactory(
List<PubsubClient.IncomingMessage> messages) {
+ System.out.println("messages " + messages);
Review Comment:
Ditto.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -207,26 +271,47 @@ PubsubIO.Read<PubsubMessage> buildPubsubRead() {
throw new IllegalArgumentException(
"Both PubsubTestClientFactory and Clock need to be specified for
testing, but only one is provided");
}
+ if (!Strings.isNullOrEmpty(configuration.getIdAttribute())) {
+ pubsubRead =
pubsubRead.withIdAttribute(configuration.getIdAttribute());
+ }
+ if (!Strings.isNullOrEmpty(configuration.getTimestampAttribute())) {
+ pubsubRead =
pubsubRead.withTimestampAttribute(configuration.getTimestampAttribute());
+ }
return pubsubRead;
}
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PubsubIO.Read<PubsubMessage> pubsubRead = buildPubsubRead();
+ @SuppressWarnings("nullness")
Review Comment:
Is this suppression needed ?(if so probably can be removed by adding a null
check before this line).
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java:
##########
@@ -66,68 +73,178 @@ public Class<PubsubWriteSchemaTransformConfiguration>
configurationClass() {
}
public static class ErrorFn extends DoFn<Row, PubsubMessage> {
- private SerializableFunction<Row, byte[]> valueMapper;
- private Schema errorSchema;
+ private final SerializableFunction<Row, byte[]> valueMapper;
+ private final @Nullable Set<String> attributes;
+ private final @Nullable String attributesMap;
+ private final Schema payloadSchema;
+ private final Schema errorSchema;
+ private final boolean useErrorOutput;
- ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema)
{
+ ErrorFn(
+ SerializableFunction<Row, byte[]> valueMapper,
+ @Nullable List<String> attributes,
+ @Nullable String attributesMap,
+ Schema payloadSchema,
+ Schema errorSchema,
+ boolean useErrorOutput) {
this.valueMapper = valueMapper;
+ this.attributes = attributes == null ? null :
ImmutableSet.copyOf(attributes);
+ this.attributesMap = attributesMap;
+ this.payloadSchema = payloadSchema;
this.errorSchema = errorSchema;
+ this.useErrorOutput = useErrorOutput;
}
@ProcessElement
- public void processElement(@Element Row row, MultiOutputReceiver receiver)
{
+ public void processElement(@Element Row row, MultiOutputReceiver receiver)
throws Exception {
try {
- receiver.get(OUTPUT_TAG).output(new
PubsubMessage(valueMapper.apply(row), null));
- } catch (Exception e) {
+ Row payloadRow;
+ Map<String, String> messageAttributes = null;
+ if (attributes == null && attributesMap == null) {
+ payloadRow = row;
+ } else {
+ Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+ messageAttributes = new HashMap<>();
+ List<Schema.Field> fields = row.getSchema().getFields();
+ for (int ix = 0; ix < fields.size(); ix++) {
+ String name = fields.get(ix).getName();
+ if (attributes != null && attributes.contains(name)) {
+ messageAttributes.put(name, row.getValue(ix));
+ } else if (name.equals(attributesMap)) {
+ Map<String, String> attrs = row.<String, String>getMap(ix);
+ if (attrs != null) {
+ messageAttributes.putAll(attrs);
+ }
+ } else {
+ payloadRowBuilder.addValue(row.getValue(ix));
+ }
+ }
+ payloadRow = payloadRowBuilder.build();
+ }
receiver
- .get(ERROR_TAG)
- .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
+ .get(OUTPUT_TAG)
+ .output(new PubsubMessage(valueMapper.apply(payloadRow),
messageAttributes));
+ } catch (Exception e) {
+ if (useErrorOutput) {
+ receiver
+ .get(ERROR_TAG)
+ .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
+ } else {
+ throw e;
+ }
}
}
}
@Override
public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
- if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat().toUpperCase()))
{
throw new IllegalArgumentException(
String.format(
"Format %s not supported. Only supported formats are %s",
configuration.getFormat(), VALID_FORMATS_STR));
}
- return new PubsubWriteSchemaTransform(configuration.getTopic(),
configuration.getFormat());
+ return new PubsubWriteSchemaTransform(configuration);
}
private static class PubsubWriteSchemaTransform extends SchemaTransform
implements Serializable {
- final String topic;
- final String format;
+ final PubsubWriteSchemaTransformConfiguration configuration;
- PubsubWriteSchemaTransform(String topic, String format) {
- this.topic = topic;
- this.format = format;
+ PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration
configuration) {
+ this.configuration = configuration;
}
@Override
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+ })
public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ @SuppressWarnings("nullness")
Review Comment:
Duplicate suppression ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]