Abacn commented on code in PR #26267:
URL: https://github.com/apache/beam/pull/26267#discussion_r1166071451


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java:
##########
@@ -247,18 +247,37 @@ private static <T> SourceOperationResponse 
performSplitTyped(
           serializedSize);
     }
 
+    List<BoundedSource<T>> bundlesBeforeCoalesce = bundles;
     int numBundlesBeforeRebundling = bundles.size();
     // To further reduce size of the response and service-side memory usage, 
coalesce
     // the sources into numBundlesLimit compressed serialized bundles.
-    if (bundles.size() > numBundlesLimit) {
+    while (serializedSize > apiByteLimit || bundles.size() > numBundlesLimit) {
+      // bundle size constrained by API limit, adds 5% allowance
+      int targetBundleSizeApiLimit = (int) (bundles.size() * apiByteLimit / 
serializedSize * 0.95);
+      // bundle size constrained by numBundlesLimit
+      int targetBundleSizeBundleLimit = Math.min(numBundlesLimit, 
bundles.size() - 1);
+      int targetBundleSize = Math.min(targetBundleSizeApiLimit, 
targetBundleSizeBundleLimit);
+
+      if (targetBundleSize <= 1) {
+        String message =
+            String.format(
+                "Unable to coalesce the sources into compressed serialized 
bundles to satisfy the "
+                    + "allowable limit when splitting %s. With %d bundles, 
total serialized size "
+                    + "of %d bytes is still larger than the limit %d. For more 
information, please "
+                    + "check the corresponding FAQ entry at "
+                    + 
"https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline";,
+                source, bundles.size(), serializedSize, apiByteLimit);

Review Comment:
   This option is BigQueryIO.read only while the bug is generic to dataflow 
worker. With that option the read teansform will expand differently and likely 
avoided the problematic code path here (haven't verify)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to