gemini-code-assist[bot] commented on code in PR #38332:
URL: https://github.com/apache/beam/pull/38332#discussion_r3163621377
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -147,109 +147,117 @@ public PCollectionTuple
expand(PCollection<KV<DestinationT, ElementT>> input) {
.get(patchTableSchemaTag)
.setCoder(KvCoder.of(destinationCoder,
ProtoCoder.of(TableSchema.class)));
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder,
elementCoder));
+ if (!hasSchemaUpdateOptions) {
+ // Don't expand the update graph if it's not needed.
+ return result;
+ } else {
+ final int numShards =
+ input
+ .getPipeline()
+ .getOptions()
+ .as(BigQueryOptions.class)
+ .getSchemaUpgradeBufferingShards();
- final int numShards =
- input
- .getPipeline()
- .getOptions()
- .as(BigQueryOptions.class)
- .getSchemaUpgradeBufferingShards();
+ // Throttle the stream to the patch-table function so that only a single
update per table per
+ // two seconds gets processed (to match quotas). The combiner merges
incremental schemas, so
+ // we
+ // won't miss any updates.
+ PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
+ result
+ .get(patchTableSchemaTag)
+ .apply(
+ "rewindow",
+ Window.<KV<DestinationT, TableSchema>>configure()
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardSeconds(2))))
+ .discardingFiredPanes())
+ .apply("merge schemas", Combine.fewKeys(new
MergeSchemaCombineFn()))
+ .setCoder(KvCoder.of(destinationCoder,
ProtoCoder.of(TableSchema.class)))
+ .apply(
+ "Patch table schema",
+ ParDo.of(
+ new PatchTableSchemaDoFn<>(operationName, bqServices,
dynamicDestinations)))
+ .setCoder(KvCoder.of(destinationCoder,
NullableCoder.of(elementCoder)))
+ // We need to make sure that all shards of the buffering
transform are notified.
+ .apply(
+ "fanout to all shards",
+ FlatMapElements.via(
+ new SimpleFunction<
+ KV<DestinationT, ElementT>,
+ Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
+ @Override
+ public Iterable<KV<ShardedKey<DestinationT>,
ElementT>> apply(
+ KV<DestinationT, ElementT> elem) {
+ return IntStream.range(0, numShards)
+ .mapToObj(
+ i ->
+ KV.of(
+
StorageApiConvertMessages.AssignShardFn.getShardedKey(
+ elem.getKey(), i, numShards),
+ elem.getValue()))
+ .collect(Collectors.toList());
+ }
+ }))
+ .setCoder(
+ KvCoder.of(ShardedKey.Coder.of(destinationCoder),
NullableCoder.of(elementCoder)))
+ .apply(
+ Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
+ .triggering(DefaultTrigger.of()));
- // Throttle the stream to the patch-table function so that only a single
update per table per
- // two seconds gets processed (to match quotas). The combiner merges
incremental schemas, so we
- // won't miss any updates.
- PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
- result
- .get(patchTableSchemaTag)
- .apply(
- "rewindow",
- Window.<KV<DestinationT, TableSchema>>configure()
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardSeconds(2))))
- .discardingFiredPanes())
- .apply("merge schemas", Combine.fewKeys(new
MergeSchemaCombineFn()))
- .setCoder(KvCoder.of(destinationCoder,
ProtoCoder.of(TableSchema.class)))
- .apply(
- "Patch table schema",
- ParDo.of(
- new PatchTableSchemaDoFn<>(operationName, bqServices,
dynamicDestinations)))
- .setCoder(KvCoder.of(destinationCoder,
NullableCoder.of(elementCoder)))
- // We need to make sure that all shards of the buffering transform
are notified.
- .apply(
- "fanout to all shards",
- FlatMapElements.via(
- new SimpleFunction<
- KV<DestinationT, ElementT>,
- Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
- @Override
- public Iterable<KV<ShardedKey<DestinationT>, ElementT>>
apply(
- KV<DestinationT, ElementT> elem) {
- return IntStream.range(0, numShards)
- .mapToObj(
- i ->
- KV.of(
-
StorageApiConvertMessages.AssignShardFn.getShardedKey(
- elem.getKey(), i, numShards),
- elem.getValue()))
- .collect(Collectors.toList());
- }
- }))
- .setCoder(
- KvCoder.of(ShardedKey.Coder.of(destinationCoder),
NullableCoder.of(elementCoder)))
- .apply(
- Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
- .triggering(DefaultTrigger.of()));
+ // Any elements that are waiting for a schema update are sent to this
stateful DoFn to be
+ // buffered.
+ // Note: we currently do not provide the DynamicDestinations object
access to the side input
+ // in
+ // this path.
+ // This is because side inputs are not currently available from timer
callbacks. Since side
+ // inputs are generally
+ // used for getSchema and in this case we read the schema from the
table, this is unlikely to
+ // be
+ // a problem.
+ PCollection<KV<ShardedKey<DestinationT>, ElementT>>
shardedWaitingElements =
+ result
+ .get(elementsWaitingForSchemaTag)
+ // TODO: Consider using GroupIntoBatchs.withShardingKey to get
auto sharding here
+ // instead of fixed sharding.
+ .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
+ .setCoder(
+ KvCoder.of(
+ ShardedKey.Coder.of(destinationCoder),
NullableCoder.of(elementCoder)));
- // Any elements that are waiting for a schema update are sent to this
stateful DoFn to be
- // buffered.
- // Note: we currently do not provide the DynamicDestinations object access
to the side input in
- // this path.
- // This is because side inputs are not currently available from timer
callbacks. Since side
- // inputs are generally
- // used for getSchema and in this case we read the schema from the table,
this is unlikely to be
- // a problem.
- PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements
=
- result
- .get(elementsWaitingForSchemaTag)
- // TODO: Consider using GroupIntoBatchs.withShardingKey to get
auto sharding here
- // instead of fixed sharding.
- .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
- .setCoder(
- KvCoder.of(ShardedKey.Coder.of(destinationCoder),
NullableCoder.of(elementCoder)));
+ PCollectionList<KV<ShardedKey<DestinationT>, ElementT>>
waitingElementsList =
+ PCollectionList.of(shardedWaitingElements).and(tablesPatched);
+ PCollectionTuple retryResult =
+ waitingElementsList
+ .apply("Buffered flatten", Flatten.pCollections())
+ .apply(
+ "bufferElements",
+ ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder,
convertMessagesDoFn))
+ .withOutputTags(
+ successfulWritesTag,
+ TupleTagList.of(ImmutableList.of(failedWritesTag,
BAD_RECORD_TAG))));
+ retryResult.get(successfulWritesTag).setCoder(successCoder);
+ retryResult.get(failedWritesTag).setCoder(errorCoder);
+
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
- PCollectionList<KV<ShardedKey<DestinationT>, ElementT>>
waitingElementsList =
- PCollectionList.of(shardedWaitingElements).and(tablesPatched);
- PCollectionTuple retryResult =
- waitingElementsList
- .apply("Buffered flatten", Flatten.pCollections())
- .apply(
- "bufferElements",
- ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder,
convertMessagesDoFn))
- .withOutputTags(
- successfulWritesTag,
- TupleTagList.of(ImmutableList.of(failedWritesTag,
BAD_RECORD_TAG))));
- retryResult.get(successfulWritesTag).setCoder(successCoder);
- retryResult.get(failedWritesTag).setCoder(errorCoder);
-
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
-
- // Flatten successes and failures from both the regular transform and the
retry transform.
- PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
- PCollectionList.of(result.get(successfulWritesTag))
- .and(retryResult.get(successfulWritesTag))
- .apply("flattenSuccesses", Flatten.pCollections());
- PCollection<BigQueryStorageApiInsertError> allFailures =
- PCollectionList.of(result.get(failedWritesTag))
- .and(retryResult.get(failedWritesTag))
- .apply("flattenFailures", Flatten.pCollections());
- PCollection<BadRecord> allBadRecords =
- PCollectionList.of(result.get(BAD_RECORD_TAG))
- .and(retryResult.get(BAD_RECORD_TAG))
- .apply("flattenBadRecords", Flatten.pCollections());
- return PCollectionTuple.of(successfulWritesTag, allSuccesses)
- .and(failedWritesTag, allFailures)
- .and(BAD_RECORD_TAG, allBadRecords);
+ // Flatten successes and failures from both the regular transform and
the retry transform.
+ PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
+ PCollectionList.of(result.get(successfulWritesTag))
+ .and(retryResult.get(successfulWritesTag))
+ .apply("flattenSuccesses", Flatten.pCollections());
+ PCollection<BigQueryStorageApiInsertError> allFailures =
+ PCollectionList.of(result.get(failedWritesTag))
+ .and(retryResult.get(failedWritesTag))
+ .apply("flattenFailures", Flatten.pCollections());
+ PCollection<BadRecord> allBadRecords =
+ PCollectionList.of(result.get(BAD_RECORD_TAG))
+ .and(retryResult.get(BAD_RECORD_TAG))
+ .apply("flattenBadRecords", Flatten.pCollections());
+ return PCollectionTuple.of(successfulWritesTag, allSuccesses)
+ .and(failedWritesTag, allFailures)
+ .and(BAD_RECORD_TAG, allBadRecords);
+ }
Review Comment:

To improve readability and reduce nesting, you can use a guard clause.
Instead of wrapping the rest of the method in an `else` block, you can return
early and then have the main logic at the top level of the method.
```java
final int numShards =
input
.getPipeline()
.getOptions()
.as(BigQueryOptions.class)
.getSchemaUpgradeBufferingShards();
// Throttle the stream to the patch-table function so that only a single
update per table per
// two seconds gets processed (to match quotas). The combiner merges
incremental schemas, so
// we
// won't miss any updates.
PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
result
.get(patchTableSchemaTag)
.apply(
"rewindow",
Window.<KV<DestinationT, TableSchema>>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))))
.discardingFiredPanes())
.apply("merge schemas", Combine.fewKeys(new
MergeSchemaCombineFn()))
.setCoder(KvCoder.of(destinationCoder,
ProtoCoder.of(TableSchema.class)))
.apply(
"Patch table schema",
ParDo.of(
new PatchTableSchemaDoFn<>(operationName, bqServices,
dynamicDestinations)))
.setCoder(KvCoder.of(destinationCoder,
NullableCoder.of(elementCoder)))
// We need to make sure that all shards of the buffering
transform are notified.
.apply(
"fanout to all shards",
FlatMapElements.via(
new SimpleFunction<
KV<DestinationT, ElementT>,
Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
@Override
public Iterable<KV<ShardedKey<DestinationT>,
ElementT>> apply(
KV<DestinationT, ElementT> elem) {
return IntStream.range(0, numShards)
.mapToObj(
i ->
KV.of(
StorageApiConvertMessages.AssignShardFn.getShardedKey(
elem.getKey(), i, numShards),
elem.getValue()))
.collect(Collectors.toList());
}
}))
.setCoder(
KvCoder.of(ShardedKey.Coder.of(destinationCoder),
NullableCoder.of(elementCoder)))
.apply(
Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
.triggering(DefaultTrigger.of()));
// Any elements that are waiting for a schema update are sent to this
stateful DoFn to be
// buffered.
// Note: we currently do not provide the DynamicDestinations object
access to the side input
// in
// this path.
// This is because side inputs are not currently available from timer
callbacks. Since side
// inputs are generally
// used for getSchema and in this case we read the schema from the
table, this is unlikely to
// be
// a problem.
PCollection<KV<ShardedKey<DestinationT>, ElementT>>
shardedWaitingElements =
result
.get(elementsWaitingForSchemaTag)
// TODO: Consider using GroupIntoBatchs.withShardingKey to get
auto sharding here
// instead of fixed sharding.
.apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
.setCoder(
KvCoder.of(
ShardedKey.Coder.of(destinationCoder),
NullableCoder.of(elementCoder)));
PCollectionList<KV<ShardedKey<DestinationT>, ElementT>>
waitingElementsList =
PCollectionList.of(shardedWaitingElements).and(tablesPatched);
PCollectionTuple retryResult =
waitingElementsList
.apply("Buffered flatten", Flatten.pCollections())
.apply(
"bufferElements",
ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder,
convertMessagesDoFn))
.withOutputTags(
successfulWritesTag,
TupleTagList.of(ImmutableList.of(failedWritesTag,
BAD_RECORD_TAG))));
retryResult.get(successfulWritesTag).setCoder(successCoder);
retryResult.get(failedWritesTag).setCoder(errorCoder);
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
// Flatten successes and failures from both the regular transform and
the retry transform.
PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
PCollectionList.of(result.get(successfulWritesTag))
.and(retryResult.get(successfulWritesTag))
.apply("flattenSuccesses", Flatten.pCollections());
PCollection<BigQueryStorageApiInsertError> allFailures =
PCollectionList.of(result.get(failedWritesTag))
.and(retryResult.get(failedWritesTag))
.apply("flattenFailures", Flatten.pCollections());
PCollection<BadRecord> allBadRecords =
PCollectionList.of(result.get(BAD_RECORD_TAG))
.and(retryResult.get(BAD_RECORD_TAG))
.apply("flattenBadRecords", Flatten.pCollections());
return PCollectionTuple.of(successfulWritesTag, allSuccesses)
.and(failedWritesTag, allFailures)
.and(BAD_RECORD_TAG, allBadRecords);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]