Copilot commented on code in PR #37411:
URL: https://github.com/apache/beam/pull/37411#discussion_r2724597611


##########
sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOTest.java:
##########
@@ -241,4 +241,31 @@ public void testReadFromReceiverIteratorData() {
     PAssert.that(actual).containsInAnyOrder(expected);
     pipeline.run().waitUntilFinish(Duration.standardSeconds(15));
   }
+
+  @Test
+  public void testReadFromCustomReceiverWithParallelism() {
+    CustomReceiverWithOffset.shouldFailInTheMiddle = false;
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new 
ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SparkReceiverIO.Read<String> reader =
+        SparkReceiverIO.<String>read()
+            .withGetOffsetFn(Long::valueOf)
+            .withTimestampFn(Instant::parse)
+            .withPullFrequencySec(PULL_FREQUENCY_SEC)
+            .withStartPollTimeoutSec(START_POLL_TIMEOUT_SEC)
+            .withStartOffset(START_OFFSET)
+            .withSparkReceiverBuilder(receiverBuilder)
+            .withNumReaders(3);
+
+    List<String> expected = new ArrayList<>();
+    for (int j = 0; j < 3; j++) {
+      for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) {
+        expected.add(String.valueOf(i));
+      }

Review Comment:
   The test expects 60 records (3 readers * 20 records each), which means it's 
testing for duplicate data rather than proper parallel reading. This test 
validates the buggy behavior where each reader independently reads all data 
from the source. The test should instead verify that 20 unique records are read 
once, distributed across the 3 readers without duplication.
   ```suggestion
       for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) {
         expected.add(String.valueOf(i));
   ```



##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -265,7 +267,7 @@ public void stop() {
 
   @ProcessElement
   public ProcessContinuation processElement(
-      @Element byte[] element,
+      @Element Integer element,

Review Comment:
   The `element` parameter (representing the shard ID) is never used in the 
processElement method. This means that when multiple readers are configured 
(via withNumReaders), each DoFn instance will independently create a 
SparkReceiver starting from the same startOffset, resulting in duplicate data 
being read. For example, with 3 readers, the same 20 records will be read 3 
times, producing 60 total records with duplicates.
   
   This defeats the purpose of parallel reading for scalability. The shard ID 
should be used to either:
   1. Partition the offset range among readers (e.g., reader 0 reads offsets 
0-6, reader 1 reads 7-13, reader 2 reads 14-19), or
   2. Use the shard ID to configure the Spark Receiver to read from different 
partitions/sources
   
   Without this coordination, the feature creates duplicate data rather than 
distributing work.



##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -191,10 +211,29 @@ public PCollection<V> expand(PBegin input) {
                 sparkReceiverBuilder.getSparkReceiverClass().getName()));
       } else {
         LOG.info("{} started reading", 
ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
-        return input
-            .apply(Impulse.create())
-            .apply(ParDo.of(new 
ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
-        // TODO: Split data from SparkReceiver into multiple workers
+        Integer numReadersObj = sparkReceiverRead.getNumReaders();
+        if (numReadersObj == null || numReadersObj == 1) {
+          return input
+              .apply(Impulse.create())
+              .apply(
+                  MapElements.into(TypeDescriptors.integers())
+                      .via(
+                          new SerializableFunction<byte[], Integer>() {
+                            @Override
+                            public Integer apply(byte[] input) {
+                              return 0;
+                            }
+                          }))

Review Comment:
   The backward compatibility path introduces unnecessary complexity by 
converting the Impulse output (byte[]) to Integer via MapElements. This 
conversion is not needed - you could directly use Impulse.create() followed by 
MapElements to produce an Integer shard ID of 0, or better yet, use 
Create.of(0) directly for consistency with the multi-reader path. The current 
approach adds an extra transformation step without clear benefit.
   ```suggestion
                 .apply(Create.of(0))
   ```



##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -151,12 +162,21 @@ public Read<V> withStartPollTimeoutSec(Long 
startPollTimeoutSec) {
       return toBuilder().setStartPollTimeoutSec(startPollTimeoutSec).build();
     }
 

Review Comment:
   The Javadoc comment for withStartOffset was removed, creating inconsistency 
with the other configuration methods (withPullFrequencySec, 
withStartPollTimeoutSec, etc.) which all have Javadoc comments. The comment 
should be restored to maintain documentation consistency across the API.
   ```suggestion
   
       /** Inclusive start offset from which the reading should be started. */
   ```



##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -151,12 +162,21 @@ public Read<V> withStartPollTimeoutSec(Long 
startPollTimeoutSec) {
       return toBuilder().setStartPollTimeoutSec(startPollTimeoutSec).build();
     }
 
-    /** Inclusive start offset from which the reading should be started. */
     public Read<V> withStartOffset(Long startOffset) {
       checkArgument(startOffset != null, "Start offset can not be null");
       return toBuilder().setStartOffset(startOffset).build();
     }
 
+    /**
+     * A number of workers to read from Spark {@link Receiver}.
+     *
+     * <p>If this value is not set, or set to 1, the reading will be performed 
on a single worker.

Review Comment:
   The documentation for withNumReaders claims it will allow reading from 
multiple workers, but it doesn't explain that each reader will independently 
read all data, resulting in duplicates. The documentation should clearly state 
the expected behavior - whether it's intended to duplicate data for redundancy 
or to distribute work without duplication. Currently, the behavior doesn't 
match the stated goal of addressing a "scalability bottleneck" since 
duplicating data doesn't improve scalability.
   ```suggestion
        * Configures how many independent workers (readers) will read from the 
same Spark
        * {@link Receiver}.
        *
        * <p>Each configured reader connects to the underlying source 
independently and will
        * typically observe the full stream of data. As a result, records may 
be duplicated
        * across readers; this option does <b>not</b> shard or partition the 
input among workers.
        *
        * <p>This setting is intended for use cases where redundant consumption 
of the same data
        * is acceptable (for example, to increase robustness when dealing with 
flaky sources),
        * and should not be used as a mechanism for load-balancing or avoiding 
scalability
        * bottlenecks via input partitioning. If you require a single logical 
read without
        * duplicates, leave {@code numReaders} at its default of {@code 1} and 
apply your own
        * partitioning or deduplication to the resulting {@link PCollection}.
        *
        * <p>If this value is not set, or set to {@code 1}, the reading will be 
performed on a
        * single worker.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to