damondouglas opened a new pull request, #24443:
URL: https://github.com/apache/beam/pull/24443
## Overview
This PR closes #21412 through a `PubsubWriteSchemaTransformProvider`
implementation and closes #24206 through modifications to `PubsubClient` and
related subclasses.
`PubsubWriteSchemaTransformProvider` instantiates a `SchemaTransform` via a
`PubsubWriteSchemaTransformConfiguration` received as a beam `Row`.
`SchemaTransform` builds a `PTransform<PCollectionRowTuple,
PCollectionRowTuple>` that uses the `PubsubWriteSchemaTransformConfiguration`
to instantiate a `PubsubIO.Write<PubsubMessage>`. Receiving a tagged `Row` via
`PCollectionRowTuple` input, the transform converts the `Row` input into
`PubsubMessage`s that it then writes to a Pub/Sub topic.
```mermaid
flowchart TD
provider([PubsubWriteSchemaTransformProvider])
transform["PTransform<PCollectionRowTuple, PCollectionRowTuple>"]
row["Row (PubsubWriteSchemaTransformConfiguration)"]
input["Row (attributes|timestamp|payload|...user fields)"]
pubsub["PubsubIO.Write<PubsubMessage>"]
subgraph "PTransform<PCollectionRowTuple, PCollectionRowTuple>"
input --> PubsubRowToMessage
PubsubRowToMessage --> PubsubMessage
PubsubMessage --> pubsub
end
subgraph PubsubWriteSchemaTransformProvider
row --> provider
provider --> transform
end
```
## Description
The following serves as a guide to aid the reviewer provided in order of the
aforementioned flow.
### Validate PCollectionRowTuple input tags
A valid `PCollectionRowTuple` input tag check passes when there is a single
tagged Row.
`PubsubWriteSchemaTransformProviderTest::testInvalidTaggedInput` detects
errors in this transform's initial PCollectionRowTuple input.
### Build PubsubRowToMessage compatible Schema
Prior to applying the Row input to PubsubIO.Write, the transform needs to
convert the `PCollection<Row>` input into `PCollection<PubsubMessage>`. Since
the transform delegates this work to `PubsubRowToMessage`, we need a compatible
Schema. See PR #23897 for details on this expected Schema. Additionally it
depends on a validation of the source Row input Schema and
`PubsubWriteSchemaTransformConfiguration`.
`PubsubWriteSchemaTransformProviderTest::testBuildTargetSchema` detects
errors in this step.
### Validate Row input Schema against Configuration
The transform allows a user to name their own input fields. However in
order to comply with the expected `PubsubRowToMessage` Schema (see PR #23897),
the configuration needs to inform the transform what those fields are named.
Therefore, validation at this step needs to determine:
1. Does the configuration state the name of the expected attributes,
timestamp, or payload field when such fields do not exist in the input Schema?
2. Are the types of the named input Schema fields compatible with the
expected `PubsubRowToMessage` Schema (see PR #23897)?
Note that validation at this step ignores whether the input Schema contains
both a payload field and user fields. This validation is performed by
`PubsubRowToMessage`.
`PubsubWriteSchemaTransformProviderTest::testValidateSourceSchemaAgainstConfiguration`
detects errors in this step.
### Convert Row input to compatible PubsubMessage Row input
Prior to transforming into `PubsubMessage`s, the transform needs to convert
the original Row input into that which is compatible for `PubsubRowToMessage`.
The transform delegates this work to the `ConvertForRowToMessage` DoFn.
`PubsubWriteSchemaTransformProviderTest::testConvertForRowToMessageTransform`
detects errors in this step.
### Validate Target Schema against Pub/Sub Schema
Pub/Sub provides an optional ability for users to configure a schema (not to
be confused with a Beam Schema). In this context, we need to validate whether
serialized payload is compatible with the Pub/Sub schema. The
`validateTargetSchemaAgainstPubsubSchema` method performs this check.
Modifying the existing `PubsubClient` and its implementing `PubsubGrpcClient`
provides the needed dependency to query for a Pub/Sub topic's schema and
convert to a Beam Schema. This PR adds additional testing coverage to the
existing tests for `PubsubClient` and its related subclasses. Additionally,
this PR adds a new `PubsubSchemaIT` integration test to detect errors in
`PubsubClient`'s ability to detect a topic schema and convert to a Beam Schema.
### Convert Row input to PubsubMessage
See PR #23897 for a description of `PubsubRowToMessage`.
### Build PubsubIO.Write
Finally after converting the Row input into `PubsubMessage`s, the transform
applies the resulting PCollection to `PubsubIO.Write<PubsubMessage>`.
`PubsubWriteSchemaTransformProvider::buildPubsubWrite` performs this and
`PubsubWriteSchemaTransformProviderTest::testBuildPubsubWrite` tests this stage.
`PubsubWriteSchemaTransformProviderIT` begins the work of integration test
coverage. It only validates whether a simple payload bytes containing Row
input writes to a Pub/Sub topic. Attempts at validating avro formatted
serialized Row payloads failed and future work needs to increase
`PubsubWriteSchemaTransformProviderIT`'s coverage.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Mention the appropriate issue in your description (for example:
`addresses #123`), if applicable. This will automatically add a link to the
pull request in the issue. If you would like the issue to automatically close
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]