damondouglas opened a new pull request, #24443:
URL: https://github.com/apache/beam/pull/24443

   ## Overview
   
   This PR closes #21412 through a `PubsubWriteSchemaTransformProvider` 
implementation and closes #24206 through modifications to `PubsubClient` and 
related subclasses.
   
   `PubsubWriteSchemaTransformProvider` instantiates a `SchemaTransform` via a  
`PubsubWriteSchemaTransformConfiguration` received as a beam `Row`.  
`SchemaTransform` builds a `PTransform<PCollectionRowTuple, 
PCollectionRowTuple>` that uses the `PubsubWriteSchemaTransformConfiguration` 
to instantiate a `PubsubIO.Write<PubsubMessage>`.  Receiving a tagged `Row` via 
`PCollectionRowTuple` input, the transform converts the `Row` input into 
`PubsubMessage`s that it then writes to a Pub/Sub topic.
   
   ```mermaid
   flowchart TD
       
       provider([PubsubWriteSchemaTransformProvider])
       transform["PTransform&lt;PCollectionRowTuple, PCollectionRowTuple&gt;"]
       row["Row (PubsubWriteSchemaTransformConfiguration)"]
       input["Row (attributes|timestamp|payload|...user fields)"]
       pubsub["PubsubIO.Write&lt;PubsubMessage&gt;"]
   
   
       subgraph "PTransform&lt;PCollectionRowTuple, PCollectionRowTuple&gt;"
            input --> PubsubRowToMessage
            PubsubRowToMessage --> PubsubMessage
            PubsubMessage --> pubsub
       end
   
       subgraph PubsubWriteSchemaTransformProvider
           row --> provider
           provider --> transform
       end
   
   ```
   
   ## Description
   
   The following serves as a guide to aid the reviewer provided in order of the 
aforementioned flow.
   
   ### Validate PCollectionRowTuple input tags
   
   A valid `PCollectionRowTuple` input tag check passes when there is a single 
tagged Row.
   
   `PubsubWriteSchemaTransformProviderTest::testInvalidTaggedInput` detects 
errors in this transform's initial PCollectionRowTuple input.
   
   ### Build PubsubRowToMessage compatible Schema
   
   Prior to applying the Row input to PubsubIO.Write, the transform needs to 
convert the `PCollection<Row>` input into `PCollection<PubsubMessage>`.  Since 
the transform delegates this work to `PubsubRowToMessage`, we need a compatible 
Schema.  See PR #23897 for details on this expected Schema.  Additionally  it 
depends on a validation of the source Row input Schema and 
`PubsubWriteSchemaTransformConfiguration`.
   
   `PubsubWriteSchemaTransformProviderTest::testBuildTargetSchema` detects 
errors in this step.
   
   ### Validate Row input Schema against Configuration
   
   The transform allows a user to name their own input fields.  However in 
order to comply with the expected `PubsubRowToMessage` Schema (see PR #23897), 
the configuration needs to inform the transform what those fields are named.  
Therefore, validation at this step needs to determine:
   
   1. Does the configuration state the name of the expected attributes, 
timestamp, or payload field when such fields do not exist in the input Schema?
   2. Are the types of the named input Schema fields compatible with the 
expected `PubsubRowToMessage` Schema (see PR #23897)?
   
   Note that validation at this step ignores whether the input Schema contains 
both a payload field and user fields.  This validation is performed by 
`PubsubRowToMessage`.
   
   
`PubsubWriteSchemaTransformProviderTest::testValidateSourceSchemaAgainstConfiguration`
 detects errors in this step.
   
   ### Convert Row input to compatible PubsubMessage Row input
   
   Prior to transforming into `PubsubMessage`s, the transform needs to convert 
the original Row input into that which is compatible for `PubsubRowToMessage`.  
The transform delegates this work to the `ConvertForRowToMessage` DoFn.
   
   
`PubsubWriteSchemaTransformProviderTest::testConvertForRowToMessageTransform` 
detects errors in this step.
   
   ### Validate Target Schema against Pub/Sub Schema
   
   Pub/Sub provides an optional ability for users to configure a schema (not to 
be confused with a Beam Schema).  In this context, we need to validate whether 
serialized payload is compatible with the Pub/Sub schema.  The 
`validateTargetSchemaAgainstPubsubSchema` method performs this check.  
Modifying the existing `PubsubClient` and its implementing `PubsubGrpcClient` 
provides the needed dependency to query for a Pub/Sub topic's schema and 
convert to a Beam Schema.  This PR adds additional testing coverage to the 
existing tests for `PubsubClient` and its related subclasses.  Additionally, 
this PR adds a new `PubsubSchemaIT` integration test to detect errors in 
`PubsubClient`'s ability to detect a topic schema and convert to a Beam Schema.
   
   ### Convert Row input to PubsubMessage
   
   See PR #23897 for a description of `PubsubRowToMessage`.
   
   ### Build PubsubIO.Write
   
   Finally after converting the Row input into `PubsubMessage`s, the transform 
applies the resulting PCollection to `PubsubIO.Write<PubsubMessage>`.  
`PubsubWriteSchemaTransformProvider::buildPubsubWrite` performs this and 
`PubsubWriteSchemaTransformProviderTest::testBuildPubsubWrite` tests this stage.
   
   `PubsubWriteSchemaTransformProviderIT` begins the work of integration test 
coverage.  It only validates whether a simple payload bytes containing Row 
input writes to a Pub/Sub topic.  Attempts at validating avro formatted 
serialized Row payloads failed and future work needs to increase 
`PubsubWriteSchemaTransformProviderIT`'s coverage.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to