Abacn commented on code in PR #26267:
URL: https://github.com/apache/beam/pull/26267#discussion_r1170319513
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java:
##########
@@ -247,18 +247,37 @@ private static <T> SourceOperationResponse
performSplitTyped(
serializedSize);
}
+ List<BoundedSource<T>> bundlesBeforeCoalesce = bundles;
int numBundlesBeforeRebundling = bundles.size();
// To further reduce size of the response and service-side memory usage,
coalesce
// the sources into numBundlesLimit compressed serialized bundles.
- if (bundles.size() > numBundlesLimit) {
+ while (serializedSize > apiByteLimit || bundles.size() > numBundlesLimit) {
+ // bundle size constrained by API limit, adds 5% allowance
+ int targetBundleSizeApiLimit = (int) (bundles.size() * apiByteLimit /
serializedSize * 0.95);
+ // bundle size constrained by numBundlesLimit
+ int targetBundleSizeBundleLimit = Math.min(numBundlesLimit,
bundles.size() - 1);
+ int targetBundleSize = Math.min(targetBundleSizeApiLimit,
targetBundleSizeBundleLimit);
+
+ if (targetBundleSize <= 1) {
+ String message =
+ String.format(
+ "Unable to coalesce the sources into compressed serialized
bundles to satisfy the "
+ + "allowable limit when splitting %s. With %d bundles,
total serialized size "
+ + "of %d bytes is still larger than the limit %d. For more
information, please "
+ + "check the corresponding FAQ entry at "
+ +
"https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline",
+ source, bundles.size(), serializedSize, apiByteLimit);
Review Comment:
Hi @chamikaramj is this code path only for Dataflow runner v1? If so we
could document the recommendation of runner v2 also in
https://cloud.google.com/dataflow/docs/guides/common-errors#boundedsource-objects-splitintobundles
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]