damondouglas opened a new pull request, #23897:
URL: https://github.com/apache/beam/pull/23897

   This PR addresses #21412 with a PubsubRowToMessage transform.
   
   ## Validation logic
   
   Prior to processing `Row` inputs, it throws errors from `Schema` validation 
according to the following fields, where in the `Row` input:
   
   `attributes` - refers to the expected `Map<String, String>`
   `timestamp` - refers to the expected `ReadableDateTime`
   `payload` - refers to either the expected `FieldType.BYTES` or `TypeName.ROW`
   `user fields` - refers to any fields that are present in the `Row` input 
that are not `attributes`, `timestamp`, or `payload` fields
   `PayloadSerializer` - refers to the interface responsible for serializing a 
`Row` to `byte[]`
   
   _NOTE: `...` in the table below indicates that the value in the cell is 
irrelevant for the case_
   
   | attributes | timestamp | payload | user fields | PayloadSerializer | 
reason |
   | --- | --- | --- | --- | --- | --- |
   | not Map<String, String> | ... | ... | ... | ... | attributes are expected 
to match the same type as a `PubsubMessage` |
   | ... | not DATETIME | ... | ... | ... | the timestamp is expected to be 
derived from the Schema type |
   | ... | ... | not BYTES or ROW | ... | ... | we expect no other type but 
BYTES or ROW |
   | ... | ... | exists | exists | ... | providing the payload and user fields 
is incompatible |
   | ... | ... | BYTES | ... | not null | providing a PayloadSerializer in the 
context of payload BYTES is incompatible |
   | ... | ... | ROW | ... | null | PayloadSerializer is required to serialize 
payload ROW input |
   | ... | ... | ... | exist | null | PayloadSerializer is required to 
serialize a Row with user fields |
   
   ## Timestamp logic
   
   The following summarizes the logic when populating the stringified timestamp 
of the resulting `PubsubMessage`'s attributes, where in the `Row` input:
   
   `attributes` - refers to the expected `Map<String, String>`
   `timestamp` - refers to the expected `ReadableDateTime`
   `targetTimestampKey` - refers to the optional configuration parameter
   `sourceTimestampKey` - refers to the expected timestamp field name of the 
`Row` input
   
   _NOTE: `...` in the table below indicates that the value in the cell is 
irrelevant for the case_
   
   | user provided timestamp in attributes | timestamp | targetTimestampKey | 
result PubsubMessage attributes key | result PubsubMessage attributes value | 
reason |
   | --- | --- | --- | --- | --- | --- |
   | exists | ... | ... | sourceTimestampKey | value from Instant.now() | we 
only use the Row's timestamp field as input |
   | ... | exists | ... | sourceTimestampKey | value from Row input | we apply 
the stringified timestamp value to the PubsubMessage attributes using the 
sourceTimestampKey as its key |
   | ... | ... | exists | targetTimestampKey | ... | we use the configured key 
instead of the default sourceTimestampKey |
   | ... | absent | ... | ... | value from Instant.now() | when Row input lacks 
the timestamp we generate from Instant.now() |
   
   ## Payload logic
   
   The following summarizes the logic when populating the `byte[]` payload to a 
`PubsubMessage`, where in the `Row` input:
   
   `payload` - refers to either the expected `FieldType.BYTES` or `TypeName.ROW`
   `user fields` - refers to any fields that are present in the `Row` input 
that are not `attributes`, `timestamp`, or `payload` fields
   
   | payload | user fields | PubsubMessage byte[] | reason |
   | --- | --- | --- | --- |
   | BYTES | absent | raw byte[] value from payload Field | no serialization is 
needed since the Row input already has the byte[] payload |
   | ROW | absent | serialized Row output from payload Field | serialization is 
applied using the PayloadSerializer |
   | absent | present | serialized Row output from user fields | serialization 
is applied to the Row using the PayloadSerializer resulting from user fields 
after removing the timestamp and attributes fields |
   
   
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to