Dippatel98 commented on code in PR #27366:
URL: https://github.com/apache/beam/pull/27366#discussion_r1256467983
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java:
##########
@@ -18,300 +18,240 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import com.google.api.client.util.Clock;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.PipelineResult;
import
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link PubsubReadSchemaTransformProvider}. */
+/** Tests for {@link
org.apache.beam.sdk.io.gcp.pubsub.PubsubReadSchemaTransformProvider}. */
@RunWith(JUnit4.class)
public class PubsubReadSchemaTransformProviderTest {
- private static final Schema SCHEMA =
+ private static final Schema BEAMSCHEMA =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("number", Schema.FieldType.INT64));
-
+ private static final Schema BEAMSCHEMAWITHERROR =
+ Schema.of(Schema.Field.of("error", Schema.FieldType.STRING));
+ private static final String SCHEMA =
AvroUtils.toAvroSchema(BEAMSCHEMA).toString();
private static final String SUBSCRIPTION =
"projects/project/subscriptions/subscription";
private static final String TOPIC = "projects/project/topics/topic";
- private static final List<TestCase> cases =
- Arrays.asList(
- testCase(
- "no configured topic or subscription",
-
PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).build())
- .expectInvalidConfiguration(),
- testCase(
- "both topic and subscription configured",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setSubscription(TOPIC)
- .setDataSchema(SCHEMA)
- .build())
- .expectInvalidConfiguration(),
- testCase(
- "invalid format configured",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .setFormat("invalidformat")
- .build())
- .expectInvalidConfiguration(),
- testCase(
- "configuration with subscription",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)),
- testCase(
- "configuration with topic",
- PubsubReadSchemaTransformConfiguration.builder()
- .setTopic(TOPIC)
- .setDataSchema(SCHEMA)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)),
- testCase(
- "configuration with subscription, timestamp and id
attributes",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setTimestampAttribute("timestampAttribute")
- .setIdAttribute("idAttribute")
- .setDataSchema(SCHEMA)
- .build())
- .withExpectedPubsubRead(
- PubsubIO.readMessages()
- .fromSubscription(SUBSCRIPTION)
- .withTimestampAttribute("timestampAttribute")
- .withIdAttribute("idAttribute")),
- testCase(
- "configuration with subscription and dead letter queue",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .setDeadLetterQueue(TOPIC)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION))
- .withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)),
- testCase(
- "configuration with subscription, timestamp attribute, and
dead letter queue",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setTimestampAttribute("timestampAttribute")
- .setDataSchema(SCHEMA)
- .setDeadLetterQueue(TOPIC)
- .build())
- .withExpectedPubsubRead(
- PubsubIO.readMessages()
- .fromSubscription(SUBSCRIPTION)
- .withTimestampAttribute("timestampAttribute"))
- .withExpectedDeadLetterQueue(
-
PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute")));
-
- private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
- private static final TypeDescriptor<PubsubReadSchemaTransformConfiguration>
TYPE_DESCRIPTOR =
- TypeDescriptor.of(PubsubReadSchemaTransformConfiguration.class);
- private static final
SerializableFunction<PubsubReadSchemaTransformConfiguration, Row>
- ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
-
private static final List<Row> ROWS =
Arrays.asList(
- Row.withSchema(SCHEMA).withFieldValue("name",
"a").withFieldValue("number", 100L).build(),
- Row.withSchema(SCHEMA).withFieldValue("name",
"b").withFieldValue("number", 200L).build(),
- Row.withSchema(SCHEMA)
+ Row.withSchema(BEAMSCHEMA)
+ .withFieldValue("name", "a")
+ .withFieldValue("number", 100L)
+ .build(),
+ Row.withSchema(BEAMSCHEMA)
+ .withFieldValue("name", "b")
+ .withFieldValue("number", 200L)
+ .build(),
+ Row.withSchema(BEAMSCHEMA)
.withFieldValue("name", "c")
.withFieldValue("number", 300L)
.build());
- private static final Clock CLOCK = (Clock & Serializable) () ->
1656788475425L;
+ private static final List<Row> ROWSWITHERROR =
+ Arrays.asList(
+ Row.withSchema(BEAMSCHEMAWITHERROR).withFieldValue("error",
"a").build(),
+ Row.withSchema(BEAMSCHEMAWITHERROR).withFieldValue("error",
"b").build(),
+ Row.withSchema(BEAMSCHEMAWITHERROR).withFieldValue("error",
"c").build());
+
+ private static final Clock CLOCK = (Clock & Serializable) () ->
1678988970000L;
private static final AvroPayloadSerializerProvider
AVRO_PAYLOAD_SERIALIZER_PROVIDER =
new AvroPayloadSerializerProvider();
private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER =
- AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(SCHEMA, new HashMap<>());
-
- @Rule public TestPipeline p =
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+ AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAMSCHEMA, new
HashMap<>());
+ private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER_WITH_ERROR =
+ AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAMSCHEMAWITHERROR, new
HashMap<>());
- @Test
- public void testBuildDeadLetterQueueWrite() {
- for (TestCase testCase : cases) {
- PubsubIO.Write<PubsubMessage> dlq =
- testCase.pubsubReadSchemaTransform().buildDeadLetterQueueWrite();
-
- if (testCase.expectedDeadLetterQueue == null) {
- assertNull(testCase.name, dlq);
- return;
- }
-
- Map<DisplayData.Identifier, DisplayData.Item> actual =
DisplayData.from(dlq).asMap();
- Map<DisplayData.Identifier, DisplayData.Item> expected =
testCase.expectedDeadLetterQueue;
-
- assertEquals(testCase.name, expected, actual);
- }
- }
+ @Rule public transient TestPipeline p = TestPipeline.create();
@Test
- public void testReadAvro() throws IOException {
+ public void testInvalidConfigNoTopicOrSubscription() {
PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
- PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform =
- schemaTransformWithClock("avro");
- PubsubTestClient.PubsubTestClientFactory clientFactory =
- clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis()));
- transform.setClientFactory(clientFactory);
- PCollectionRowTuple reads = begin.apply(transform);
-
-
PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
-
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ begin.apply(
+ new PubsubReadSchemaTransformProvider()
+ .from(
+ PubsubReadSchemaTransformConfiguration.builder()
+ .setSchema(SCHEMA)
+ .setFormat("AVRO")
+ .build())));
p.run().waitUntilFinish();
Review Comment:
Done
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java:
##########
@@ -18,300 +18,240 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import com.google.api.client.util.Clock;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.PipelineResult;
import
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link PubsubReadSchemaTransformProvider}. */
+/** Tests for {@link
org.apache.beam.sdk.io.gcp.pubsub.PubsubReadSchemaTransformProvider}. */
@RunWith(JUnit4.class)
public class PubsubReadSchemaTransformProviderTest {
- private static final Schema SCHEMA =
+ private static final Schema BEAMSCHEMA =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("number", Schema.FieldType.INT64));
-
+ private static final Schema BEAMSCHEMAWITHERROR =
+ Schema.of(Schema.Field.of("error", Schema.FieldType.STRING));
+ private static final String SCHEMA =
AvroUtils.toAvroSchema(BEAMSCHEMA).toString();
private static final String SUBSCRIPTION =
"projects/project/subscriptions/subscription";
private static final String TOPIC = "projects/project/topics/topic";
- private static final List<TestCase> cases =
- Arrays.asList(
- testCase(
- "no configured topic or subscription",
-
PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).build())
- .expectInvalidConfiguration(),
- testCase(
- "both topic and subscription configured",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setSubscription(TOPIC)
- .setDataSchema(SCHEMA)
- .build())
- .expectInvalidConfiguration(),
- testCase(
- "invalid format configured",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .setFormat("invalidformat")
- .build())
- .expectInvalidConfiguration(),
- testCase(
- "configuration with subscription",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)),
- testCase(
- "configuration with topic",
- PubsubReadSchemaTransformConfiguration.builder()
- .setTopic(TOPIC)
- .setDataSchema(SCHEMA)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)),
- testCase(
- "configuration with subscription, timestamp and id
attributes",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setTimestampAttribute("timestampAttribute")
- .setIdAttribute("idAttribute")
- .setDataSchema(SCHEMA)
- .build())
- .withExpectedPubsubRead(
- PubsubIO.readMessages()
- .fromSubscription(SUBSCRIPTION)
- .withTimestampAttribute("timestampAttribute")
- .withIdAttribute("idAttribute")),
- testCase(
- "configuration with subscription and dead letter queue",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .setDeadLetterQueue(TOPIC)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION))
- .withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)),
- testCase(
- "configuration with subscription, timestamp attribute, and
dead letter queue",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setTimestampAttribute("timestampAttribute")
- .setDataSchema(SCHEMA)
- .setDeadLetterQueue(TOPIC)
- .build())
- .withExpectedPubsubRead(
- PubsubIO.readMessages()
- .fromSubscription(SUBSCRIPTION)
- .withTimestampAttribute("timestampAttribute"))
- .withExpectedDeadLetterQueue(
-
PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute")));
-
- private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
- private static final TypeDescriptor<PubsubReadSchemaTransformConfiguration>
TYPE_DESCRIPTOR =
- TypeDescriptor.of(PubsubReadSchemaTransformConfiguration.class);
- private static final
SerializableFunction<PubsubReadSchemaTransformConfiguration, Row>
- ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
-
private static final List<Row> ROWS =
Arrays.asList(
- Row.withSchema(SCHEMA).withFieldValue("name",
"a").withFieldValue("number", 100L).build(),
- Row.withSchema(SCHEMA).withFieldValue("name",
"b").withFieldValue("number", 200L).build(),
- Row.withSchema(SCHEMA)
+ Row.withSchema(BEAMSCHEMA)
+ .withFieldValue("name", "a")
+ .withFieldValue("number", 100L)
+ .build(),
+ Row.withSchema(BEAMSCHEMA)
+ .withFieldValue("name", "b")
+ .withFieldValue("number", 200L)
+ .build(),
+ Row.withSchema(BEAMSCHEMA)
.withFieldValue("name", "c")
.withFieldValue("number", 300L)
.build());
- private static final Clock CLOCK = (Clock & Serializable) () ->
1656788475425L;
+ private static final List<Row> ROWSWITHERROR =
+ Arrays.asList(
+ Row.withSchema(BEAMSCHEMAWITHERROR).withFieldValue("error",
"a").build(),
+ Row.withSchema(BEAMSCHEMAWITHERROR).withFieldValue("error",
"b").build(),
+ Row.withSchema(BEAMSCHEMAWITHERROR).withFieldValue("error",
"c").build());
Review Comment:
Done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java:
##########
@@ -76,360 +50,113 @@
* provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
* repository.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
+// @SuppressWarnings({
+// "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+// })
@AutoService(SchemaTransformProvider.class)
public class PubsubWriteSchemaTransformProvider
extends
TypedSchemaTransformProvider<PubsubWriteSchemaTransformConfiguration> {
- private static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:pubsub_write:v1";
- static final String INPUT_TAG = "input";
- static final String ERROR_TAG = "error";
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<PubsubWriteSchemaTransformConfiguration>
configurationClass() {
- return PubsubWriteSchemaTransformConfiguration.class;
- }
+ public static final TupleTag<PubsubMessage> OUTPUT_TAG = new
TupleTag<PubsubMessage>() {};
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
- return new PubsubWriteSchemaTransform(configuration);
- }
-
- /** Implementation of the {@link SchemaTransformProvider} identifier method.
*/
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
-
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
- * single input is expected, this returns a list with a single name.
- */
- @Override
- public List<String> inputCollectionNames() {
- return Collections.singletonList(INPUT_TAG);
- }
+ public static final String VALID_FORMATS_STR = "AVRO,JSON";
+ public static final Set<String> VALID_DATA_FORMATS =
+ Sets.newHashSet(VALID_FORMATS_STR.split(","));
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. The
- * only expected output is the {@link #ERROR_TAG}.
- */
@Override
- public List<String> outputCollectionNames() {
- return Collections.singletonList(ERROR_TAG);
+ public Class<PubsubWriteSchemaTransformConfiguration> configurationClass() {
+ return PubsubWriteSchemaTransformConfiguration.class;
}
- /**
- * An implementation of {@link SchemaTransform} for Pub/Sub writes
configured using {@link
- * PubsubWriteSchemaTransformConfiguration}.
- */
- static class PubsubWriteSchemaTransform extends SchemaTransform {
-
- private final PubsubWriteSchemaTransformConfiguration configuration;
+ public static class ErrorFn extends DoFn<Row, PubsubMessage> {
+ private SerializableFunction<Row, byte[]> valueMapper;
+ private Schema errorSchema;
- private PubsubClient.PubsubClientFactory pubsubClientFactory;
-
- PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration
configuration) {
- this.configuration = configuration;
+ ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema)
{
+ this.valueMapper = valueMapper;
+ this.errorSchema = errorSchema;
}
- PubsubWriteSchemaTransform
withPubsubClientFactory(PubsubClient.PubsubClientFactory factory) {
- this.pubsubClientFactory = factory;
- return this;
- }
-
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- if (input.getAll().size() != 1 || !input.has(INPUT_TAG)) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s input is expected to contain a single %s tagged
PCollection<Row>",
- input.getClass().getSimpleName(), getClass().getSimpleName(),
INPUT_TAG));
- }
-
- PCollection<Row> rows = input.get(INPUT_TAG);
- if (rows.getSchema().getFieldCount() == 0) {
- throw new IllegalArgumentException(String.format("empty Schema for
%s", INPUT_TAG));
- }
-
- Schema targetSchema = buildTargetSchema(rows.getSchema());
-
- rows =
- rows.apply(
- ConvertForRowToMessage.class.getSimpleName(),
- convertForRowToMessage(targetSchema))
- .setRowSchema(targetSchema);
-
- Schema schema = rows.getSchema();
-
- Schema serializableSchema =
- removeFields(schema, DEFAULT_ATTRIBUTES_KEY_NAME,
DEFAULT_EVENT_TIMESTAMP_KEY_NAME);
- FieldMatcher payloadRowMatcher =
FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW);
- if (payloadRowMatcher.match(serializableSchema)) {
- serializableSchema =
-
serializableSchema.getField(DEFAULT_PAYLOAD_KEY_NAME).getType().getRowSchema();
- }
-
- validateTargetSchemaAgainstPubsubSchema(serializableSchema,
input.getPipeline().getOptions());
-
- PCollectionTuple pct =
- rows.apply(
- PubsubRowToMessage.class.getSimpleName(),
- buildPubsubRowToMessage(serializableSchema));
-
- PCollection<PubsubMessage> messages = pct.get(OUTPUT);
- messages.apply(PubsubIO.Write.class.getSimpleName(), buildPubsubWrite());
- return PCollectionRowTuple.of(ERROR_TAG, pct.get(ERROR));
- }
-
- PayloadSerializer getPayloadSerializer(Schema schema) {
- if (configuration.getFormat() == null) {
- return null;
- }
- String format = configuration.getFormat();
- Set<String> availableFormats =
- Providers.loadProviders(PayloadSerializerProvider.class).keySet();
- if (!availableFormats.contains(format)) {
- String availableFormatsString = String.join(",", availableFormats);
- throw new IllegalArgumentException(
- String.format(
- "%s is not among the valid formats: [%s]", format,
availableFormatsString));
- }
- return PayloadSerializers.getSerializer(configuration.getFormat(),
schema, ImmutableMap.of());
- }
-
- PubsubRowToMessage buildPubsubRowToMessage(Schema schema) {
- PubsubRowToMessage.Builder builder =
-
PubsubRowToMessage.builder().setPayloadSerializer(getPayloadSerializer(schema));
-
- if (configuration.getTarget() != null) {
- builder =
- builder.setTargetTimestampAttributeName(
- configuration.getTarget().getTimestampAttributeKey());
- }
-
- return builder.build();
- }
-
- PubsubIO.Write<PubsubMessage> buildPubsubWrite() {
- PubsubIO.Write<PubsubMessage> write =
PubsubIO.writeMessages().to(configuration.getTopic());
-
- if (configuration.getIdAttribute() != null) {
- write = write.withIdAttribute(configuration.getIdAttribute());
- }
-
- if (pubsubClientFactory != null) {
- write = write.withClientFactory(pubsubClientFactory);
+ @ProcessElement
+ public void processElement(@Element Row row, MultiOutputReceiver receiver)
{
+ try {
+ receiver.get(OUTPUT_TAG).output(new
PubsubMessage(valueMapper.apply(row), null));
+ } catch (Exception e) {
+ System.out.println("Error while parsing the element" + e.toString());
+ receiver
+ .get(ERROR_TAG)
+ .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
}
-
- return write;
}
+ }
- void validateSourceSchemaAgainstConfiguration(Schema sourceSchema) {
- if (sourceSchema.getFieldCount() == 0) {
- throw new IllegalArgumentException(String.format("empty Schema for
%s", INPUT_TAG));
- }
-
- if (configuration.getSource() == null) {
- return;
- }
-
- SourceConfiguration source = configuration.getSource();
-
- if (source.getAttributesFieldName() != null) {
- String fieldName = source.getAttributesFieldName();
- FieldType fieldType = ATTRIBUTES_FIELD_TYPE;
- FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType);
- checkArgument(
- fieldMatcher.match(sourceSchema),
- String.format("schema missing field: %s for type %s: ", fieldName,
fieldType));
- }
-
- if (source.getTimestampFieldName() != null) {
- String fieldName = source.getTimestampFieldName();
- FieldType fieldType = EVENT_TIMESTAMP_FIELD_TYPE;
- FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType);
- checkArgument(
- fieldMatcher.match(sourceSchema),
- String.format("schema missing field: %s for type: %s", fieldName,
fieldType));
- }
-
- if (source.getPayloadFieldName() == null) {
- return;
- }
-
- String fieldName = source.getPayloadFieldName();
- FieldMatcher bytesFieldMatcher = FieldMatcher.of(fieldName,
PAYLOAD_BYTES_TYPE_NAME);
- FieldMatcher rowFieldMatcher = FieldMatcher.of(fieldName,
PAYLOAD_ROW_TYPE_NAME);
- SchemaReflection schemaReflection = SchemaReflection.of(sourceSchema);
- checkArgument(
- schemaReflection.matchesAny(bytesFieldMatcher, rowFieldMatcher),
+ @Override
+ public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+ throw new IllegalArgumentException(
String.format(
- "schema missing field: %s for types %s or %s",
- fieldName, PAYLOAD_BYTES_TYPE_NAME, PAYLOAD_ROW_TYPE_NAME));
-
- String[] fieldsToExclude =
- Stream.of(
- source.getAttributesFieldName(),
- source.getTimestampFieldName(),
- source.getPayloadFieldName())
- .filter(Objects::nonNull)
- .toArray(String[]::new);
-
- Schema userFieldsSchema = removeFields(sourceSchema, fieldsToExclude);
-
- if (userFieldsSchema.getFieldCount() > 0) {
- throw new IllegalArgumentException(
- String.format("user fields incompatible with %s field",
source.getPayloadFieldName()));
- }
- }
-
- void validateTargetSchemaAgainstPubsubSchema(Schema targetSchema,
PipelineOptions options) {
- checkArgument(options != null);
-
- try (PubsubClient pubsubClient =
getPubsubClient(options.as(PubsubOptions.class))) {
- PubsubClient.TopicPath topicPath =
PubsubClient.topicPathFromPath(configuration.getTopic());
- PubsubClient.SchemaPath schemaPath =
pubsubClient.getSchemaPath(topicPath);
- if (schemaPath == null ||
schemaPath.equals(SchemaPath.DELETED_SCHEMA)) {
- return;
- }
- Schema expectedSchema = pubsubClient.getSchema(schemaPath);
- checkState(
- targetSchema.equals(expectedSchema),
- String.format(
- "input schema mismatch with expected schema at path: %s\ninput
schema: %s\nPub/Sub schema: %s",
- schemaPath, targetSchema, expectedSchema));
- } catch (IOException e) {
- throw new IllegalStateException(e.getMessage());
- }
- }
-
- Schema buildTargetSchema(Schema sourceSchema) {
- validateSourceSchemaAgainstConfiguration(sourceSchema);
- FieldType payloadFieldType = null;
-
- List<String> fieldsToRemove = new ArrayList<>();
-
- if (configuration.getSource() != null) {
- SourceConfiguration source = configuration.getSource();
-
- if (source.getAttributesFieldName() != null) {
- fieldsToRemove.add(source.getAttributesFieldName());
- }
-
- if (source.getTimestampFieldName() != null) {
- fieldsToRemove.add(source.getTimestampFieldName());
- }
-
- if (source.getPayloadFieldName() != null) {
- String fieldName = source.getPayloadFieldName();
- Field field = sourceSchema.getField(fieldName);
- payloadFieldType = field.getType();
- fieldsToRemove.add(fieldName);
- }
- }
-
- Schema targetSchema =
- PubsubRowToMessage.builder()
- .build()
- .inputSchemaFactory(payloadFieldType)
- .buildSchema(sourceSchema.getFields().toArray(new Field[0]));
-
- return removeFields(targetSchema, fieldsToRemove.toArray(new String[0]));
+ "Format %s not supported. Only supported formats are %s",
+ configuration.getFormat(), VALID_FORMATS_STR));
}
+ return new PubsubWriteSchemaTransform(configuration.getTopic(),
configuration.getFormat());
+ }
- private PubsubClient.PubsubClientFactory getPubsubClientFactory() {
- if (pubsubClientFactory != null) {
- return pubsubClientFactory;
- }
- return PubsubGrpcClient.FACTORY;
- }
+ private static class PubsubWriteSchemaTransform extends SchemaTransform
implements Serializable {
+ final String topic;
+ final String format;
- private PubsubClient getPubsubClient(PubsubOptions options) throws
IOException {
- return getPubsubClientFactory()
- .newClient(
- configuration.getTarget().getTimestampAttributeKey(),
- configuration.getIdAttribute(),
- options);
+ PubsubWriteSchemaTransform(String topic, String format) {
+ this.topic = topic;
+ this.format = format;
}
- ParDo.SingleOutput<Row, Row> convertForRowToMessage(Schema targetSchema) {
- return convertForRowToMessage(targetSchema, null);
+ public @UnknownKeyFor @NonNull @Initialized PTransform<
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+ buildTransform() {
+ return this;
}
Review Comment:
Done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java:
##########
@@ -76,360 +50,113 @@
* provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
* repository.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
+// @SuppressWarnings({
+// "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+// })
@AutoService(SchemaTransformProvider.class)
public class PubsubWriteSchemaTransformProvider
extends
TypedSchemaTransformProvider<PubsubWriteSchemaTransformConfiguration> {
- private static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:pubsub_write:v1";
- static final String INPUT_TAG = "input";
- static final String ERROR_TAG = "error";
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<PubsubWriteSchemaTransformConfiguration>
configurationClass() {
- return PubsubWriteSchemaTransformConfiguration.class;
- }
+ public static final TupleTag<PubsubMessage> OUTPUT_TAG = new
TupleTag<PubsubMessage>() {};
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
- return new PubsubWriteSchemaTransform(configuration);
- }
-
- /** Implementation of the {@link SchemaTransformProvider} identifier method.
*/
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
-
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
- * single input is expected, this returns a list with a single name.
- */
- @Override
- public List<String> inputCollectionNames() {
- return Collections.singletonList(INPUT_TAG);
- }
+ public static final String VALID_FORMATS_STR = "AVRO,JSON";
+ public static final Set<String> VALID_DATA_FORMATS =
+ Sets.newHashSet(VALID_FORMATS_STR.split(","));
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. The
- * only expected output is the {@link #ERROR_TAG}.
- */
@Override
- public List<String> outputCollectionNames() {
- return Collections.singletonList(ERROR_TAG);
+ public Class<PubsubWriteSchemaTransformConfiguration> configurationClass() {
+ return PubsubWriteSchemaTransformConfiguration.class;
}
- /**
- * An implementation of {@link SchemaTransform} for Pub/Sub writes
configured using {@link
- * PubsubWriteSchemaTransformConfiguration}.
- */
- static class PubsubWriteSchemaTransform extends SchemaTransform {
-
- private final PubsubWriteSchemaTransformConfiguration configuration;
+ public static class ErrorFn extends DoFn<Row, PubsubMessage> {
+ private SerializableFunction<Row, byte[]> valueMapper;
+ private Schema errorSchema;
- private PubsubClient.PubsubClientFactory pubsubClientFactory;
-
- PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration
configuration) {
- this.configuration = configuration;
+ ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema)
{
+ this.valueMapper = valueMapper;
+ this.errorSchema = errorSchema;
}
- PubsubWriteSchemaTransform
withPubsubClientFactory(PubsubClient.PubsubClientFactory factory) {
- this.pubsubClientFactory = factory;
- return this;
- }
-
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- if (input.getAll().size() != 1 || !input.has(INPUT_TAG)) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s input is expected to contain a single %s tagged
PCollection<Row>",
- input.getClass().getSimpleName(), getClass().getSimpleName(),
INPUT_TAG));
- }
-
- PCollection<Row> rows = input.get(INPUT_TAG);
- if (rows.getSchema().getFieldCount() == 0) {
- throw new IllegalArgumentException(String.format("empty Schema for
%s", INPUT_TAG));
- }
-
- Schema targetSchema = buildTargetSchema(rows.getSchema());
-
- rows =
- rows.apply(
- ConvertForRowToMessage.class.getSimpleName(),
- convertForRowToMessage(targetSchema))
- .setRowSchema(targetSchema);
-
- Schema schema = rows.getSchema();
-
- Schema serializableSchema =
- removeFields(schema, DEFAULT_ATTRIBUTES_KEY_NAME,
DEFAULT_EVENT_TIMESTAMP_KEY_NAME);
- FieldMatcher payloadRowMatcher =
FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW);
- if (payloadRowMatcher.match(serializableSchema)) {
- serializableSchema =
-
serializableSchema.getField(DEFAULT_PAYLOAD_KEY_NAME).getType().getRowSchema();
- }
-
- validateTargetSchemaAgainstPubsubSchema(serializableSchema,
input.getPipeline().getOptions());
-
- PCollectionTuple pct =
- rows.apply(
- PubsubRowToMessage.class.getSimpleName(),
- buildPubsubRowToMessage(serializableSchema));
-
- PCollection<PubsubMessage> messages = pct.get(OUTPUT);
- messages.apply(PubsubIO.Write.class.getSimpleName(), buildPubsubWrite());
- return PCollectionRowTuple.of(ERROR_TAG, pct.get(ERROR));
- }
-
- PayloadSerializer getPayloadSerializer(Schema schema) {
- if (configuration.getFormat() == null) {
- return null;
- }
- String format = configuration.getFormat();
- Set<String> availableFormats =
- Providers.loadProviders(PayloadSerializerProvider.class).keySet();
- if (!availableFormats.contains(format)) {
- String availableFormatsString = String.join(",", availableFormats);
- throw new IllegalArgumentException(
- String.format(
- "%s is not among the valid formats: [%s]", format,
availableFormatsString));
- }
- return PayloadSerializers.getSerializer(configuration.getFormat(),
schema, ImmutableMap.of());
- }
-
- PubsubRowToMessage buildPubsubRowToMessage(Schema schema) {
- PubsubRowToMessage.Builder builder =
-
PubsubRowToMessage.builder().setPayloadSerializer(getPayloadSerializer(schema));
-
- if (configuration.getTarget() != null) {
- builder =
- builder.setTargetTimestampAttributeName(
- configuration.getTarget().getTimestampAttributeKey());
- }
-
- return builder.build();
- }
-
- PubsubIO.Write<PubsubMessage> buildPubsubWrite() {
- PubsubIO.Write<PubsubMessage> write =
PubsubIO.writeMessages().to(configuration.getTopic());
-
- if (configuration.getIdAttribute() != null) {
- write = write.withIdAttribute(configuration.getIdAttribute());
- }
-
- if (pubsubClientFactory != null) {
- write = write.withClientFactory(pubsubClientFactory);
+ @ProcessElement
+ public void processElement(@Element Row row, MultiOutputReceiver receiver)
{
+ try {
+ receiver.get(OUTPUT_TAG).output(new
PubsubMessage(valueMapper.apply(row), null));
+ } catch (Exception e) {
+ System.out.println("Error while parsing the element" + e.toString());
Review Comment:
Done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -43,196 +62,211 @@
* provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
* repository.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
@AutoService(SchemaTransformProvider.class)
public class PubsubReadSchemaTransformProvider
extends
TypedSchemaTransformProvider<PubsubReadSchemaTransformConfiguration> {
- static final String OUTPUT_TAG = "OUTPUT";
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<PubsubReadSchemaTransformConfiguration> configurationClass()
{
- return PubsubReadSchemaTransformConfiguration.class;
- }
+ public static final String VALID_FORMATS_STR = "AVRO,JSON";
+ public static final Set<String> VALID_DATA_FORMATS =
+ Sets.newHashSet(VALID_FORMATS_STR.split(","));
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- protected SchemaTransform from(PubsubReadSchemaTransformConfiguration
configuration) {
- PubsubMessageToRow toRowTransform =
-
PubsubSchemaTransformMessageToRowFactory.from(configuration).buildMessageToRow();
- return new PubsubReadSchemaTransform(configuration, toRowTransform);
- }
-
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
- @Override
- public String identifier() {
- return "beam:schematransform:org.apache.beam:pubsub_read:v1";
- }
+ public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+ public static final Schema ERROR_SCHEMA =
+
Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
- * no input is expected, this returns an empty list.
- */
@Override
- public List<String> inputCollectionNames() {
- return Collections.emptyList();
+ public Class<PubsubReadSchemaTransformConfiguration> configurationClass() {
+ return PubsubReadSchemaTransformConfiguration.class;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
- * a single output is expected, this returns a list with a single name.
- */
@Override
- public List<String> outputCollectionNames() {
- return Collections.singletonList(OUTPUT_TAG);
- }
-
- /**
- * An implementation of {@link SchemaTransform} for Pub/Sub reads configured
using {@link
- * PubsubReadSchemaTransformConfiguration}.
- */
- static class PubsubReadSchemaTransform extends SchemaTransform {
-
- private final PubsubReadSchemaTransformConfiguration configuration;
- private final PubsubMessageToRow pubsubMessageToRow;
-
- private PubsubClient.PubsubClientFactory clientFactory;
-
- private Clock clock;
-
- private PubsubReadSchemaTransform(
- PubsubReadSchemaTransformConfiguration configuration,
- PubsubMessageToRow pubsubMessageToRow) {
- this.configuration = configuration;
- this.pubsubMessageToRow = pubsubMessageToRow;
+ public SchemaTransform from(PubsubReadSchemaTransformConfiguration
configuration) {
+ if (configuration.getSubscription() == null && configuration.getTopic() ==
null) {
+ throw new IllegalArgumentException(
+ "To read from Pubsub, a subscription name or a topic name must be
provided");
}
- /**
- * Sets the {@link PubsubClient.PubsubClientFactory}.
- *
- * <p>Used for testing.
- */
- void setClientFactory(PubsubClient.PubsubClientFactory value) {
- this.clientFactory = value;
+ if (configuration.getSubscription() != null && configuration.getTopic() !=
null) {
+ throw new IllegalArgumentException(
+ "To read from Pubsub, a subscription name or a topic name must be
provided. Not both.");
}
- /**
- * Sets the {@link Clock}.
- *
- * <p>Used for testing.
- */
- void setClock(Clock clock) {
- this.clock = clock;
+ if ((Strings.isNullOrEmpty(configuration.getSchema())
+ && !Strings.isNullOrEmpty(configuration.getFormat()))
+ || (!Strings.isNullOrEmpty(configuration.getSchema())
+ && Strings.isNullOrEmpty(configuration.getFormat()))) {
+ throw new IllegalArgumentException(
+ "A schema was provided without a data format (or viceversa). Please
provide "
+ + "both of these parameters to read from Pubsub, or if you would
like to use the Pubsub schema service,"
+ + " please leave both of these blank.");
}
- /** Validates the {@link PubsubReadSchemaTransformConfiguration}. */
- @Override
- public void validate(@Nullable PipelineOptions options) {
- if (configuration.getSubscription() == null && configuration.getTopic()
== null) {
- throw new IllegalArgumentException(
+ Schema beamSchema;
+ SerializableFunction<byte[], Row> valueMapper;
+ if (configuration.getSchema() == null && configuration.getFormat() ==
null) {
+ try {
+ KV<Schema, SerializableFunction<byte[], Row>> schemaFunctionPair =
+ PubsubUtils.getTopicInfo(configuration.getTopic(),
configuration.getSubscription());
+ beamSchema = schemaFunctionPair.getKey();
+ valueMapper = schemaFunctionPair.getValue();
+ } catch (IOException e) {
+ throw new RuntimeException(
String.format(
- "%s needs to set either the topic or the subscription",
- PubsubReadSchemaTransformConfiguration.class));
+ "Unable to retrieve schema information for topic %s or
subscription %s",
+ configuration.getTopic(), configuration.getSubscription()),
+ e);
}
-
- if (configuration.getSubscription() != null && configuration.getTopic()
!= null) {
+ } else {
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
throw new IllegalArgumentException(
String.format(
- "%s should not set both the topic or the subscription",
- PubsubReadSchemaTransformConfiguration.class));
- }
-
- try {
- PayloadSerializers.getSerializer(
- configuration.getFormat(), configuration.getDataSchema(), new
HashMap<>());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- String.format(
- "Invalid %s, no serializer provider exists for format `%s`",
- PubsubReadSchemaTransformConfiguration.class,
configuration.getFormat()));
+ "Format %s not supported. Only supported formats are %s",
+ configuration.getFormat(), VALID_FORMATS_STR));
}
+ beamSchema =
+ Objects.equals(configuration.getFormat(), "JSON")
+ ? JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema())
+ : AvroUtils.toBeamSchema(
+ new
org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
+ valueMapper =
+ Objects.equals(configuration.getFormat(), "JSON")
+ ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+ : AvroUtils.getAvroBytesToRowFunction(beamSchema);
}
- /** Reads from Pub/Sub according to {@link
PubsubReadSchemaTransformConfiguration}. */
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- if (!input.getAll().isEmpty()) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s input is expected to be empty",
- input.getClass().getSimpleName(), getClass().getSimpleName()));
- }
+ PubsubReadSchemaTransform transform =
+ new PubsubReadSchemaTransform(
+ configuration.getTopic(), configuration.getSubscription(),
beamSchema, valueMapper);
- PCollectionTuple rowsWithDlq =
- input
- .getPipeline()
- .apply("ReadFromPubsub", buildPubsubRead())
- .apply("PubsubMessageToRow", pubsubMessageToRow);
+ if (configuration.getClientFactory() != null) {
+ transform.setClientFactory(configuration.getClientFactory());
+ }
+ if (configuration.getClock() != null) {
+ transform.setClock(configuration.getClock());
+ }
- writeToDeadLetterQueue(rowsWithDlq);
+ return transform;
+ }
- return PCollectionRowTuple.of(OUTPUT_TAG, rowsWithDlq.get(MAIN_TAG));
+ private static class PubsubReadSchemaTransform extends SchemaTransform
implements Serializable {
+ final Schema beamSchema;
+ final SerializableFunction<byte[], Row> valueMapper;
+ final @Nullable String topic;
+ final @Nullable String subscription;
+ @Nullable PubsubTestClientFactory clientFactory;
+ @Nullable Clock clock;
+
+ PubsubReadSchemaTransform(
+ @Nullable String topic,
+ @Nullable String subscription,
+ Schema beamSchema,
+ SerializableFunction<byte[], Row> valueMapper) {
+ this.topic = topic;
+ this.subscription = subscription;
+ this.beamSchema = beamSchema;
+ this.valueMapper = valueMapper;
}
- private void writeToDeadLetterQueue(PCollectionTuple rowsWithDlq) {
- PubsubIO.Write<PubsubMessage> deadLetterQueue =
buildDeadLetterQueueWrite();
- if (deadLetterQueue == null) {
- return;
- }
- rowsWithDlq.get(DLQ_TAG).apply("WriteToDeadLetterQueue",
deadLetterQueue);
- }
+ private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> {
+ private Counter pubsubErrorCounter;
+ private Long errorsInBundle = 0L;
+ private SerializableFunction<byte[], Row> valueMapper;
- /**
- * Builds {@link PubsubIO.Write} dead letter queue from {@link
- * PubsubReadSchemaTransformConfiguration}.
- */
- PubsubIO.Write<PubsubMessage> buildDeadLetterQueueWrite() {
- if (configuration.getDeadLetterQueue() == null) {
- return null;
+ ErrorCounterFn(String name, SerializableFunction<byte[], Row>
valueMapper) {
+ this.pubsubErrorCounter =
Metrics.counter(PubsubReadSchemaTransformProvider.class, name);
+ this.valueMapper = valueMapper;
}
- PubsubIO.Write<PubsubMessage> writeDlq =
- PubsubIO.writeMessages().to(configuration.getDeadLetterQueue());
+ @ProcessElement
+ public void process(@DoFn.Element PubsubMessage message,
MultiOutputReceiver receiver) {
+
+ try {
+
receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload()));
+ } catch (Exception e) {
+ errorsInBundle += 1;
+ System.out.println("Error while parsing the element" + e.toString());
+ receiver
+ .get(ERROR_TAG)
+ .output(
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(e.toString(), message.getPayload())
+ .build());
+ }
+ }
- if (configuration.getTimestampAttribute() != null) {
- writeDlq =
writeDlq.withTimestampAttribute(configuration.getTimestampAttribute());
+ @FinishBundle
+ public void finish(FinishBundleContext c) {
+ pubsubErrorCounter.inc(errorsInBundle);
+ errorsInBundle = 0L;
}
+ }
- return writeDlq;
+ void setClientFactory(@Nullable PubsubTestClientFactory factory) {
+ this.clientFactory = factory;
}
- /** Builds {@link PubsubIO.Read} from a {@link
PubsubReadSchemaTransformConfiguration}. */
- PubsubIO.Read<PubsubMessage> buildPubsubRead() {
- PubsubIO.Read<PubsubMessage> read =
PubsubIO.readMessagesWithAttributes();
+ void setClock(@Nullable Clock clock) {
+ this.clock = clock;
+ }
- if (configuration.getSubscription() != null) {
- read = read.fromSubscription(configuration.getSubscription());
+ @SuppressWarnings("nullness")
+ PubsubIO.Read<PubsubMessage> buildPubsubRead() {
+ PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages();
+ if (!Strings.isNullOrEmpty(topic)) {
+ pubsubRead = pubsubRead.fromTopic(topic);
+ } else {
+ pubsubRead = pubsubRead.fromSubscription(subscription);
}
-
- if (configuration.getTopic() != null) {
- read = read.fromTopic(configuration.getTopic());
+ if (clientFactory != null && clock != null) {
+ pubsubRead = pubsubRead.withClientFactory(clientFactory);
+ pubsubRead = clientFactory.setClock(pubsubRead, clock);
+ } else if (clientFactory != null || clock != null) {
+ throw new IllegalArgumentException(
+ "Both PubsubTestClientFactory and Clock need to be specified for
testing, but only one is provided");
}
+ return pubsubRead;
+ }
- if (configuration.getTimestampAttribute() != null) {
- read =
read.withTimestampAttribute(configuration.getTimestampAttribute());
- }
+ // @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return this;
+ }
Review Comment:
Done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java:
##########
@@ -43,196 +62,211 @@
* provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
* repository.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
@AutoService(SchemaTransformProvider.class)
public class PubsubReadSchemaTransformProvider
extends
TypedSchemaTransformProvider<PubsubReadSchemaTransformConfiguration> {
- static final String OUTPUT_TAG = "OUTPUT";
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<PubsubReadSchemaTransformConfiguration> configurationClass()
{
- return PubsubReadSchemaTransformConfiguration.class;
- }
+ public static final String VALID_FORMATS_STR = "AVRO,JSON";
+ public static final Set<String> VALID_DATA_FORMATS =
+ Sets.newHashSet(VALID_FORMATS_STR.split(","));
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- protected SchemaTransform from(PubsubReadSchemaTransformConfiguration
configuration) {
- PubsubMessageToRow toRowTransform =
-
PubsubSchemaTransformMessageToRowFactory.from(configuration).buildMessageToRow();
- return new PubsubReadSchemaTransform(configuration, toRowTransform);
- }
-
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
- @Override
- public String identifier() {
- return "beam:schematransform:org.apache.beam:pubsub_read:v1";
- }
+ public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+ public static final Schema ERROR_SCHEMA =
+
Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
- * no input is expected, this returns an empty list.
- */
@Override
- public List<String> inputCollectionNames() {
- return Collections.emptyList();
+ public Class<PubsubReadSchemaTransformConfiguration> configurationClass() {
+ return PubsubReadSchemaTransformConfiguration.class;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
- * a single output is expected, this returns a list with a single name.
- */
@Override
- public List<String> outputCollectionNames() {
- return Collections.singletonList(OUTPUT_TAG);
- }
-
- /**
- * An implementation of {@link SchemaTransform} for Pub/Sub reads configured
using {@link
- * PubsubReadSchemaTransformConfiguration}.
- */
- static class PubsubReadSchemaTransform extends SchemaTransform {
-
- private final PubsubReadSchemaTransformConfiguration configuration;
- private final PubsubMessageToRow pubsubMessageToRow;
-
- private PubsubClient.PubsubClientFactory clientFactory;
-
- private Clock clock;
-
- private PubsubReadSchemaTransform(
- PubsubReadSchemaTransformConfiguration configuration,
- PubsubMessageToRow pubsubMessageToRow) {
- this.configuration = configuration;
- this.pubsubMessageToRow = pubsubMessageToRow;
+ public SchemaTransform from(PubsubReadSchemaTransformConfiguration
configuration) {
+ if (configuration.getSubscription() == null && configuration.getTopic() ==
null) {
+ throw new IllegalArgumentException(
+ "To read from Pubsub, a subscription name or a topic name must be
provided");
}
- /**
- * Sets the {@link PubsubClient.PubsubClientFactory}.
- *
- * <p>Used for testing.
- */
- void setClientFactory(PubsubClient.PubsubClientFactory value) {
- this.clientFactory = value;
+ if (configuration.getSubscription() != null && configuration.getTopic() !=
null) {
+ throw new IllegalArgumentException(
+ "To read from Pubsub, a subscription name or a topic name must be
provided. Not both.");
}
- /**
- * Sets the {@link Clock}.
- *
- * <p>Used for testing.
- */
- void setClock(Clock clock) {
- this.clock = clock;
+ if ((Strings.isNullOrEmpty(configuration.getSchema())
+ && !Strings.isNullOrEmpty(configuration.getFormat()))
+ || (!Strings.isNullOrEmpty(configuration.getSchema())
+ && Strings.isNullOrEmpty(configuration.getFormat()))) {
+ throw new IllegalArgumentException(
+ "A schema was provided without a data format (or viceversa). Please
provide "
+ + "both of these parameters to read from Pubsub, or if you would
like to use the Pubsub schema service,"
+ + " please leave both of these blank.");
}
- /** Validates the {@link PubsubReadSchemaTransformConfiguration}. */
- @Override
- public void validate(@Nullable PipelineOptions options) {
- if (configuration.getSubscription() == null && configuration.getTopic()
== null) {
- throw new IllegalArgumentException(
+ Schema beamSchema;
+ SerializableFunction<byte[], Row> valueMapper;
+ if (configuration.getSchema() == null && configuration.getFormat() ==
null) {
+ try {
+ KV<Schema, SerializableFunction<byte[], Row>> schemaFunctionPair =
+ PubsubUtils.getTopicInfo(configuration.getTopic(),
configuration.getSubscription());
+ beamSchema = schemaFunctionPair.getKey();
+ valueMapper = schemaFunctionPair.getValue();
+ } catch (IOException e) {
+ throw new RuntimeException(
String.format(
- "%s needs to set either the topic or the subscription",
- PubsubReadSchemaTransformConfiguration.class));
+ "Unable to retrieve schema information for topic %s or
subscription %s",
+ configuration.getTopic(), configuration.getSubscription()),
+ e);
}
-
- if (configuration.getSubscription() != null && configuration.getTopic()
!= null) {
+ } else {
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
throw new IllegalArgumentException(
String.format(
- "%s should not set both the topic or the subscription",
- PubsubReadSchemaTransformConfiguration.class));
- }
-
- try {
- PayloadSerializers.getSerializer(
- configuration.getFormat(), configuration.getDataSchema(), new
HashMap<>());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- String.format(
- "Invalid %s, no serializer provider exists for format `%s`",
- PubsubReadSchemaTransformConfiguration.class,
configuration.getFormat()));
+ "Format %s not supported. Only supported formats are %s",
+ configuration.getFormat(), VALID_FORMATS_STR));
}
+ beamSchema =
+ Objects.equals(configuration.getFormat(), "JSON")
+ ? JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema())
+ : AvroUtils.toBeamSchema(
+ new
org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
+ valueMapper =
+ Objects.equals(configuration.getFormat(), "JSON")
+ ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+ : AvroUtils.getAvroBytesToRowFunction(beamSchema);
}
- /** Reads from Pub/Sub according to {@link
PubsubReadSchemaTransformConfiguration}. */
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- if (!input.getAll().isEmpty()) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s input is expected to be empty",
- input.getClass().getSimpleName(), getClass().getSimpleName()));
- }
+ PubsubReadSchemaTransform transform =
+ new PubsubReadSchemaTransform(
+ configuration.getTopic(), configuration.getSubscription(),
beamSchema, valueMapper);
- PCollectionTuple rowsWithDlq =
- input
- .getPipeline()
- .apply("ReadFromPubsub", buildPubsubRead())
- .apply("PubsubMessageToRow", pubsubMessageToRow);
+ if (configuration.getClientFactory() != null) {
+ transform.setClientFactory(configuration.getClientFactory());
+ }
+ if (configuration.getClock() != null) {
+ transform.setClock(configuration.getClock());
+ }
- writeToDeadLetterQueue(rowsWithDlq);
+ return transform;
+ }
- return PCollectionRowTuple.of(OUTPUT_TAG, rowsWithDlq.get(MAIN_TAG));
+ private static class PubsubReadSchemaTransform extends SchemaTransform
implements Serializable {
+ final Schema beamSchema;
+ final SerializableFunction<byte[], Row> valueMapper;
+ final @Nullable String topic;
+ final @Nullable String subscription;
+ @Nullable PubsubTestClientFactory clientFactory;
+ @Nullable Clock clock;
+
+ PubsubReadSchemaTransform(
+ @Nullable String topic,
+ @Nullable String subscription,
+ Schema beamSchema,
+ SerializableFunction<byte[], Row> valueMapper) {
+ this.topic = topic;
+ this.subscription = subscription;
+ this.beamSchema = beamSchema;
+ this.valueMapper = valueMapper;
}
- private void writeToDeadLetterQueue(PCollectionTuple rowsWithDlq) {
- PubsubIO.Write<PubsubMessage> deadLetterQueue =
buildDeadLetterQueueWrite();
- if (deadLetterQueue == null) {
- return;
- }
- rowsWithDlq.get(DLQ_TAG).apply("WriteToDeadLetterQueue",
deadLetterQueue);
- }
+ private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> {
+ private Counter pubsubErrorCounter;
+ private Long errorsInBundle = 0L;
+ private SerializableFunction<byte[], Row> valueMapper;
- /**
- * Builds {@link PubsubIO.Write} dead letter queue from {@link
- * PubsubReadSchemaTransformConfiguration}.
- */
- PubsubIO.Write<PubsubMessage> buildDeadLetterQueueWrite() {
- if (configuration.getDeadLetterQueue() == null) {
- return null;
+ ErrorCounterFn(String name, SerializableFunction<byte[], Row>
valueMapper) {
+ this.pubsubErrorCounter =
Metrics.counter(PubsubReadSchemaTransformProvider.class, name);
+ this.valueMapper = valueMapper;
}
- PubsubIO.Write<PubsubMessage> writeDlq =
- PubsubIO.writeMessages().to(configuration.getDeadLetterQueue());
+ @ProcessElement
+ public void process(@DoFn.Element PubsubMessage message,
MultiOutputReceiver receiver) {
+
+ try {
+
receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload()));
+ } catch (Exception e) {
+ errorsInBundle += 1;
+ System.out.println("Error while parsing the element" + e.toString());
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]