Abacn commented on issue #26395:
URL: https://github.com/apache/beam/issues/26395#issuecomment-1546138402

   Update: I tested #26618 with a test pipeline and found no noticable change 
with the switch on/off. Basically I am generating a steady stream of 100k 
element per second and each of 1 kB, so it is 1 MB/s throughput.
   
   pipeline option provided: `-Dexec.args="--project=*** 
--tempLocation=gs://***/temp --region=us-central1 --runner=DataflowRunner 
--numWorkers=5 --autoscalingAlgorithm=NONE 
--dataflowWorkerJar=/Users/***beam/runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-legacy-worker-2.48.0-SNAPSHOT.jar
 --experiments=enable_streaming_engine"`
   
   Both jobs are drained after 15 min.
   
   | Metric | prefetch disabled | prefetch enabled (current master) |
   |-----|-----|-----|
   | Elapsed time | 19 min 23 sec | 19 min 10 sec |
   | elements_read | 28,003,335 | 54,323,517 |
   | Total streaming data | 84.76 GB | 167.62 GB |
   | ratio (kB / element) | 3.03 | 3.09 |
   
   Both settings having similar ratio (3kB per element, which is 3 times the 
actual data size), however, disabling the prefetch show significant regression 
on the through put, and causing surging backlog:
   
   Backlog:
   
   (pregetch disabled) 
   <img width="507" alt="image" 
src="https://github.com/apache/beam/assets/8010435/7393c46c-c9ae-417a-822d-3d69f3c73ee1";>
   
   (prefetch enabled)
   
   <img width="515" alt="image" 
src="https://github.com/apache/beam/assets/8010435/6cbc0819-c0d1-42dd-bd55-1196ba6a5fbe";>
   
   @nbali Have you been able to test your changed code with a pipeline? To test 
modified java-core code with your pipeline, one can build both sdks-java-core 
and dataflow worker jar (for runner v1) sth like
   
   ```
   ./gradlew -Ppublishing  sdks:java:core:jar
   ./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
   ```
   
   and use the compiled jar as the dependency of java core, and pass the shadow 
worker jar to dataflow as pipeline option above.
   
   -------
   
   test pipeline:
   
   ```java
   
   public class GroupIntoBatchesTest {
   
     public static void main(String[] argv) {
       PipelineOptions options = 
PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class);
       Pipeline p = Pipeline.create(options);
   
       p.apply(GenerateSequence.from(0).withRate(100000, 
Duration.standardSeconds(1)))
           .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
           .apply(MapElements.via(new MapToKVFn(1000)))
           .apply(GroupIntoBatches.ofSize(50000))
           .apply(ParDo.of(new DoFn<KV<Integer, Iterable<byte[]>>, Void>(){
             @ProcessElement
             public void procees(ProcessContext ctx) {
               System.out.println("grouped key:" + 
Objects.requireNonNull(ctx.element()).getKey());
             }
           }));
       p.run();
   
     }
   
     static class MapToKVFn extends SimpleFunction<Long, KV<Integer, byte[]>> {
   
       private transient Random rd;
   
       private final int valueSize;
   
       public MapToKVFn(int valueSize) {
         this.valueSize = valueSize;
       }
   
       @Override
       public KV<Integer, byte[]> apply(Long input) {
         if (rd == null) {
           rd = new Random();
         }
         byte[] data = new byte[valueSize];
         rd.nextBytes(data);
         return KV.of(Long.hashCode(input)%10, data);
       }
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to