scwhittle commented on code in PR #31106:
URL: https://github.com/apache/beam/pull/31106#discussion_r1607961310
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -52,16 +52,18 @@
/** This {@link PTransform} manages loads into BigQuery using the Storage API.
*/
public class StorageApiLoads<DestinationT, ElementT>
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
- final TupleTag<KV<DestinationT, StorageApiWritePayload>>
successfulConvertedRowsTag =
- new TupleTag<>("successfulRows");
+ final TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>>
Review Comment:
Aside from update compatibility issues, doesn't this increase the data
shuffled as we are now shuffling the writepayload and the original elements?
If so it seems that we might want the previous behavior not just for older SDKs
but also in cases where an error function requiring the original element is not
configured.
Do we need to change the output type for successful writes? It seems like
the original element is just being added for error handling path.
@reuvenlax
If I understand correctly the graph is not changing here, just the encoding
of the elements and we're going from StorageApiWritePayload to KV<E,
StorageApiWritePayload>. Would it be possible to have a special coder for
KV<E, StorageApiWritePayload> such that it handles decoding previously coded
StorageApiWritePayload as KV<null, StorageApiWritePayload>?
It seems like that could be done via a Schema in some way since
StorageApiWritePayload uses autovalue schema coder. I think the dataflow
backend would note that the schema is compatible in that case and allow the
update to proceed.
Or a simpler route, could the element just be added as a nullable field to
StorageApiWritePayload instead of changing to KV<E, StorageApiWritePayload>?
To share code, could we just switch to the new type throughout and have the
element be null if not needed or missing due to previously encoded? Since the
new type is a superset of the old, it seems like the compatibility with
previous sdks could be kept to the boundaries of the impl (if the above doesn't
let you share completely).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]