johnjcasey commented on code in PR #30081:
URL: https://github.com/apache/beam/pull/30081#discussion_r1491364569
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -680,62 +697,92 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>>
writeDynamicallyShardedFil
// parallelize properly. We also ensure that the files are written if a
threshold number of
// records are ready. Dynamic sharding is achieved via the
withShardedKey() option provided by
// GroupIntoBatches.
- return input
- .apply(
- GroupIntoBatches.<DestinationT,
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
- .withByteSize(byteSize)
- .withMaxBufferingDuration(maxBufferingDuration)
- .withShardedKey())
- .setCoder(
- KvCoder.of(
- org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
- IterableCoder.of(elementCoder)))
- .apply(
- "StripShardId",
- MapElements.via(
- new SimpleFunction<
- KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>,
- KV<DestinationT, Iterable<ElementT>>>() {
- @Override
- public KV<DestinationT, Iterable<ElementT>> apply(
- KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>
- input) {
- return KV.of(input.getKey().getKey(), input.getValue());
- }
- }))
- .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
- .apply(
- "WriteGroupedRecords",
- ParDo.of(
- new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
- tempFilePrefix, maxFileSize, rowWriterFactory))
- .withSideInputs(tempFilePrefix))
+ TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
+ PCollectionTuple writeResults =
+ input
+ .apply(
+ GroupIntoBatches.<DestinationT,
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+ .withByteSize(byteSize)
+ .withMaxBufferingDuration(maxBufferingDuration)
+ .withShardedKey())
+ .setCoder(
+ KvCoder.of(
+
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
+ IterableCoder.of(elementCoder)))
+ .apply(
+ "StripShardId",
+ MapElements.via(
+ new SimpleFunction<
+ KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>,
+ KV<DestinationT, Iterable<ElementT>>>() {
+ @Override
+ public KV<DestinationT, Iterable<ElementT>> apply(
+
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
+ input) {
+ return KV.of(input.getKey().getKey(),
input.getValue());
+ }
+ }))
+ .setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(elementCoder)))
+ .apply(
+ "WriteGroupedRecords",
+ ParDo.of(
+ new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
+ tempFilePrefix,
+ maxFileSize,
+ rowWriterFactory,
+ badRecordRouter,
+ successfulResultsTag,
+ elementCoder))
+ .withSideInputs(tempFilePrefix)
+ .withOutputTags(successfulResultsTag,
TupleTagList.of(BAD_RECORD_TAG)));
+ badRecordErrorHandler.addErrorCollection(
+
writeResults.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
+
+ return writeResults
+ .get(successfulResultsTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}
private PCollection<Result<DestinationT>> writeShardedRecords(
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
PCollectionView<String> tempFilePrefix) {
- return shardedRecords
- .apply("GroupByDestination", GroupByKey.create())
- .apply(
- "StripShardId",
- MapElements.via(
- new SimpleFunction<
- KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
- KV<DestinationT, Iterable<ElementT>>>() {
- @Override
- public KV<DestinationT, Iterable<ElementT>> apply(
- KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
- return KV.of(input.getKey().getKey(), input.getValue());
- }
- }))
- .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
- .apply(
- "WriteGroupedRecords",
- ParDo.of(
- new WriteGroupedRecordsToFiles<>(tempFilePrefix,
maxFileSize, rowWriterFactory))
- .withSideInputs(tempFilePrefix))
+ TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
+ PCollectionTuple writeResults =
+ shardedRecords
+ .apply("GroupByDestination", GroupByKey.create())
+ .apply(
+ "StripShardId",
+ MapElements.via(
+ new SimpleFunction<
+ KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
+ KV<DestinationT, Iterable<ElementT>>>() {
+ @Override
+ public KV<DestinationT, Iterable<ElementT>> apply(
+ KV<ShardedKey<DestinationT>, Iterable<ElementT>>
input) {
+ return KV.of(input.getKey().getKey(),
input.getValue());
+ }
+ }))
+ .setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(elementCoder)))
+ .apply(
+ "WriteGroupedRecords",
+ ParDo.of(
+ new WriteGroupedRecordsToFiles<>(
+ tempFilePrefix,
+ maxFileSize,
+ rowWriterFactory,
+ badRecordRouter,
+ successfulResultsTag,
+ elementCoder))
+ .withSideInputs(tempFilePrefix)
+ .withOutputTags(successfulResultsTag,
TupleTagList.of(BAD_RECORD_TAG)));
+
+ badRecordErrorHandler.addErrorCollection(
+ writeResults
+ .get(BAD_RECORD_TAG)
+ .setCoder(BadRecord.getCoder(shardedRecords.getPipeline())));
Review Comment:
It would fail during pipeline construction. It shouldn't be possible because
BadRecord has a default coder
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]