robertwb commented on code in PR #28782:
URL: https://github.com/apache/beam/pull/28782#discussion_r1358478454
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java:
##########
@@ -66,68 +73,178 @@ public Class<PubsubWriteSchemaTransformConfiguration>
configurationClass() {
}
public static class ErrorFn extends DoFn<Row, PubsubMessage> {
- private SerializableFunction<Row, byte[]> valueMapper;
- private Schema errorSchema;
+ private final SerializableFunction<Row, byte[]> valueMapper;
+ private final @Nullable Set<String> attributes;
+ private final @Nullable String attributesMap;
+ private final Schema payloadSchema;
+ private final Schema errorSchema;
+ private final boolean useErrorOutput;
- ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema)
{
+ ErrorFn(
+ SerializableFunction<Row, byte[]> valueMapper,
+ @Nullable List<String> attributes,
+ @Nullable String attributesMap,
+ Schema payloadSchema,
+ Schema errorSchema,
+ boolean useErrorOutput) {
this.valueMapper = valueMapper;
+ this.attributes = attributes == null ? null :
ImmutableSet.copyOf(attributes);
+ this.attributesMap = attributesMap;
+ this.payloadSchema = payloadSchema;
this.errorSchema = errorSchema;
+ this.useErrorOutput = useErrorOutput;
}
@ProcessElement
- public void processElement(@Element Row row, MultiOutputReceiver receiver)
{
+ public void processElement(@Element Row row, MultiOutputReceiver receiver)
throws Exception {
try {
- receiver.get(OUTPUT_TAG).output(new
PubsubMessage(valueMapper.apply(row), null));
- } catch (Exception e) {
+ Row payloadRow;
+ Map<String, String> messageAttributes = null;
+ if (attributes == null && attributesMap == null) {
+ payloadRow = row;
+ } else {
+ Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+ messageAttributes = new HashMap<>();
+ List<Schema.Field> fields = row.getSchema().getFields();
+ for (int ix = 0; ix < fields.size(); ix++) {
+ String name = fields.get(ix).getName();
+ if (attributes != null && attributes.contains(name)) {
+ messageAttributes.put(name, row.getValue(ix));
+ } else if (name.equals(attributesMap)) {
+ Map<String, String> attrs = row.<String, String>getMap(ix);
+ if (attrs != null) {
+ messageAttributes.putAll(attrs);
+ }
+ } else {
+ payloadRowBuilder.addValue(row.getValue(ix));
+ }
+ }
+ payloadRow = payloadRowBuilder.build();
+ }
receiver
- .get(ERROR_TAG)
- .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
+ .get(OUTPUT_TAG)
+ .output(new PubsubMessage(valueMapper.apply(payloadRow),
messageAttributes));
+ } catch (Exception e) {
+ if (useErrorOutput) {
+ receiver
+ .get(ERROR_TAG)
+ .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
+ } else {
+ throw e;
+ }
}
}
}
@Override
public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
- if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat().toUpperCase()))
{
throw new IllegalArgumentException(
String.format(
"Format %s not supported. Only supported formats are %s",
configuration.getFormat(), VALID_FORMATS_STR));
}
- return new PubsubWriteSchemaTransform(configuration.getTopic(),
configuration.getFormat());
+ return new PubsubWriteSchemaTransform(configuration);
}
private static class PubsubWriteSchemaTransform extends SchemaTransform
implements Serializable {
- final String topic;
- final String format;
+ final PubsubWriteSchemaTransformConfiguration configuration;
- PubsubWriteSchemaTransform(String topic, String format) {
- this.topic = topic;
- this.format = format;
+ PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration
configuration) {
+ this.configuration = configuration;
}
@Override
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
Review Comment:
Unfortunately not, as all the row.getXxx() methods may be null (but are only
used in contexts where they should not be). I did try to keep the nullness
suppression local rather than decorating the whole class though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]