gemini-code-assist[bot] commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3347579029


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -379,12 +383,32 @@ public WriteResult expandUntriggered(
     PCollection<KV<DestinationT, StorageApiWritePayload>> 
successfulConvertedRows =
         convertMessagesResult.get(successfulConvertedRowsTag);
 
-    if (numShards > 0) {
+    if (numShards > 0 && input.isBounded() == PCollection.IsBounded.UNBOUNDED) 
{
       successfulConvertedRows =
           successfulConvertedRows.apply(
               "ResdistibuteNumShards",
               Redistribute.<KV<DestinationT, 
StorageApiWritePayload>>arbitrarily()
                   .withNumBuckets(numShards));
+    } else if (numShards > 0 && input.isBounded() == 
PCollection.IsBounded.BOUNDED) {
+      successfulConvertedRows =
+          successfulConvertedRows
+              .apply(
+                  "Add shard",
+                  WithKeys.of(
+                      (SerializableFunction<KV<DestinationT, 
StorageApiWritePayload>, Integer>)
+                          kv ->
+                              Math.floorMod(
+                                  Hashing.murmur3_32_fixed()
+                                          .hashString(
+                                              dynamicDestinations
+                                                  .getTable(kv.getKey())
+                                                  .getShortTableUrn(),
+                                              StandardCharsets.UTF_8)
+                                          .asInt()
+                                      ^ 
ThreadLocalRandom.current().nextInt(numShards),
+                                  numShards)))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Using `WithKeys.of` with a simple `SerializableFunction` to call 
`dynamicDestinations.getTable(...)` will fail at runtime if the 
`DynamicDestinations` implementation relies on side inputs. In Apache Beam, 
`DynamicDestinations` can access side inputs via `sideInput(...)`, which 
requires a `SideInputReader` to be set on the instance (usually via 
`dynamicDestinations.setSideInputReader(c.sideInputReader())` inside a `DoFn`).
   
   To support side inputs properly, we should use a custom `DoFn` inside a 
`ParDo` instead of `WithKeys.of`, and pass the side inputs using 
`.withSideInputs(dynamicDestinations.getSideInputs())`.
   
   ```java
                     "Add shard",
                     ParDo.of(
                             new org.apache.beam.sdk.transforms.DoFn<
                                 KV<DestinationT, StorageApiWritePayload>,
                                 KV<Integer, KV<DestinationT, 
StorageApiWritePayload>>>() {
                               @ProcessElement
                               public void processElement(ProcessContext c) {
                                 
dynamicDestinations.setSideInputReader(c.sideInputReader());
                                 KV<DestinationT, StorageApiWritePayload> 
element = c.element();
                                 TableDestination tableDestination =
                                     
dynamicDestinations.getTable(element.getKey());
                                 String tableUrn =
                                     tableDestination != null
                                         ? tableDestination.getShortTableUrn()
                                         : "";
                                 int hash =
                                     Hashing.murmur3_32_fixed()
                                         .hashString(tableUrn, 
StandardCharsets.UTF_8)
                                         .asInt();
                                 int shard =
                                     Math.floorMod(
                                         hash ^ 
ThreadLocalRandom.current().nextInt(numShards),
                                         numShards);
                                 c.output(KV.of(shard, element));
                               } 
                             })
                         .withSideInputs(dynamicDestinations.getSideInputs()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to