Abacn commented on issue #26395: URL: https://github.com/apache/beam/issues/26395#issuecomment-1546138402
Update: I tested #26618 with a test pipeline and found no noticable change with the switch on/off. Basically I am generating a steady stream of 100k element per second and each of 1 kB, so it is 1 MB/s throughput. pipeline option provided: `-Dexec.args="--project=*** --tempLocation=gs://***/temp --region=us-central1 --runner=DataflowRunner --numWorkers=5 --autoscalingAlgorithm=NONE --dataflowWorkerJar=/Users/***beam/runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-legacy-worker-2.48.0-SNAPSHOT.jar --experiments=enable_streaming_engine"` Both jobs are drained after 15 min. | Metric | prefetch disabled | prefetch enabled (current master) | |-----|-----|-----| | Elapsed time | 19 min 23 sec | 19 min 10 sec | | elements_read | 28,003,335 | 54,323,517 | | Total streaming data | 84.76 GB | 167.62 GB | | ratio (kB / element) | 3.03 | 3.09 | Both settings having similar ratio (3kB per element, which is 3 times the actual data size), however, disabling the prefetch show significant regression on the through put, and causing surging backlog: Backlog: (pregetch disabled) <img width="507" alt="image" src="https://github.com/apache/beam/assets/8010435/7393c46c-c9ae-417a-822d-3d69f3c73ee1"> (prefetch enabled) <img width="515" alt="image" src="https://github.com/apache/beam/assets/8010435/6cbc0819-c0d1-42dd-bd55-1196ba6a5fbe"> @nbali Have you been able to test your changed code with a pipeline? To test modified java-core code with your pipeline, one can build both sdks-java-core and dataflow worker jar (for runner v1) sth like ``` ./gradlew -Ppublishing sdks:java:core:jar ./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar ``` and use the compiled jar as the dependency of java core, and pass the shadow worker jar to dataflow as pipeline option above. ------- test pipeline: ```java public class GroupIntoBatchesTest { public static void main(String[] argv) { PipelineOptions options = PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class); Pipeline p = Pipeline.create(options); p.apply(GenerateSequence.from(0).withRate(100000, Duration.standardSeconds(1))) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) .apply(MapElements.via(new MapToKVFn(1000))) .apply(GroupIntoBatches.ofSize(50000)) .apply(ParDo.of(new DoFn<KV<Integer, Iterable<byte[]>>, Void>(){ @ProcessElement public void procees(ProcessContext ctx) { System.out.println("grouped key:" + Objects.requireNonNull(ctx.element()).getKey()); } })); p.run(); } static class MapToKVFn extends SimpleFunction<Long, KV<Integer, byte[]>> { private transient Random rd; private final int valueSize; public MapToKVFn(int valueSize) { this.valueSize = valueSize; } @Override public KV<Integer, byte[]> apply(Long input) { if (rd == null) { rd = new Random(); } byte[] data = new byte[valueSize]; rd.nextBytes(data); return KV.of(Long.hashCode(input)%10, data); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
