Copilot commented on code in PR #37411:
URL: https://github.com/apache/beam/pull/37411#discussion_r2724597611
##########
sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOTest.java:
##########
@@ -241,4 +241,31 @@ public void testReadFromReceiverIteratorData() {
PAssert.that(actual).containsInAnyOrder(expected);
pipeline.run().waitUntilFinish(Duration.standardSeconds(15));
}
+
+ @Test
+ public void testReadFromCustomReceiverWithParallelism() {
+ CustomReceiverWithOffset.shouldFailInTheMiddle = false;
+ ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+ new
ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+ SparkReceiverIO.Read<String> reader =
+ SparkReceiverIO.<String>read()
+ .withGetOffsetFn(Long::valueOf)
+ .withTimestampFn(Instant::parse)
+ .withPullFrequencySec(PULL_FREQUENCY_SEC)
+ .withStartPollTimeoutSec(START_POLL_TIMEOUT_SEC)
+ .withStartOffset(START_OFFSET)
+ .withSparkReceiverBuilder(receiverBuilder)
+ .withNumReaders(3);
+
+ List<String> expected = new ArrayList<>();
+ for (int j = 0; j < 3; j++) {
+ for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) {
+ expected.add(String.valueOf(i));
+ }
Review Comment:
The test expects 60 records (3 readers * 20 records each), which means it's
testing for duplicate data rather than proper parallel reading. This test
validates the buggy behavior where each reader independently reads all data
from the source. The test should instead verify that 20 unique records are read
once, distributed across the 3 readers without duplication.
```suggestion
for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) {
expected.add(String.valueOf(i));
```
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -265,7 +267,7 @@ public void stop() {
@ProcessElement
public ProcessContinuation processElement(
- @Element byte[] element,
+ @Element Integer element,
Review Comment:
The `element` parameter (representing the shard ID) is never used in the
processElement method. This means that when multiple readers are configured
(via withNumReaders), each DoFn instance will independently create a
SparkReceiver starting from the same startOffset, resulting in duplicate data
being read. For example, with 3 readers, the same 20 records will be read 3
times, producing 60 total records with duplicates.
This defeats the purpose of parallel reading for scalability. The shard ID
should be used to either:
1. Partition the offset range among readers (e.g., reader 0 reads offsets
0-6, reader 1 reads 7-13, reader 2 reads 14-19), or
2. Use the shard ID to configure the Spark Receiver to read from different
partitions/sources
Without this coordination, the feature creates duplicate data rather than
distributing work.
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -191,10 +211,29 @@ public PCollection<V> expand(PBegin input) {
sparkReceiverBuilder.getSparkReceiverClass().getName()));
} else {
LOG.info("{} started reading",
ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
- return input
- .apply(Impulse.create())
- .apply(ParDo.of(new
ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
- // TODO: Split data from SparkReceiver into multiple workers
+ Integer numReadersObj = sparkReceiverRead.getNumReaders();
+ if (numReadersObj == null || numReadersObj == 1) {
+ return input
+ .apply(Impulse.create())
+ .apply(
+ MapElements.into(TypeDescriptors.integers())
+ .via(
+ new SerializableFunction<byte[], Integer>() {
+ @Override
+ public Integer apply(byte[] input) {
+ return 0;
+ }
+ }))
Review Comment:
The backward compatibility path introduces unnecessary complexity by
converting the Impulse output (byte[]) to Integer via MapElements. This
conversion is not needed - you could directly use Impulse.create() followed by
MapElements to produce an Integer shard ID of 0, or better yet, use
Create.of(0) directly for consistency with the multi-reader path. The current
approach adds an extra transformation step without clear benefit.
```suggestion
.apply(Create.of(0))
```
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -151,12 +162,21 @@ public Read<V> withStartPollTimeoutSec(Long
startPollTimeoutSec) {
return toBuilder().setStartPollTimeoutSec(startPollTimeoutSec).build();
}
Review Comment:
The Javadoc comment for withStartOffset was removed, creating inconsistency
with the other configuration methods (withPullFrequencySec,
withStartPollTimeoutSec, etc.) which all have Javadoc comments. The comment
should be restored to maintain documentation consistency across the API.
```suggestion
/** Inclusive start offset from which the reading should be started. */
```
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -151,12 +162,21 @@ public Read<V> withStartPollTimeoutSec(Long
startPollTimeoutSec) {
return toBuilder().setStartPollTimeoutSec(startPollTimeoutSec).build();
}
- /** Inclusive start offset from which the reading should be started. */
public Read<V> withStartOffset(Long startOffset) {
checkArgument(startOffset != null, "Start offset can not be null");
return toBuilder().setStartOffset(startOffset).build();
}
+ /**
+ * A number of workers to read from Spark {@link Receiver}.
+ *
+ * <p>If this value is not set, or set to 1, the reading will be performed
on a single worker.
Review Comment:
The documentation for withNumReaders claims it will allow reading from
multiple workers, but it doesn't explain that each reader will independently
read all data, resulting in duplicates. The documentation should clearly state
the expected behavior - whether it's intended to duplicate data for redundancy
or to distribute work without duplication. Currently, the behavior doesn't
match the stated goal of addressing a "scalability bottleneck" since
duplicating data doesn't improve scalability.
```suggestion
* Configures how many independent workers (readers) will read from the
same Spark
* {@link Receiver}.
*
* <p>Each configured reader connects to the underlying source
independently and will
* typically observe the full stream of data. As a result, records may
be duplicated
* across readers; this option does <b>not</b> shard or partition the
input among workers.
*
* <p>This setting is intended for use cases where redundant consumption
of the same data
* is acceptable (for example, to increase robustness when dealing with
flaky sources),
* and should not be used as a mechanism for load-balancing or avoiding
scalability
* bottlenecks via input partitioning. If you require a single logical
read without
* duplicates, leave {@code numReaders} at its default of {@code 1} and
apply your own
* partitioning or deduplication to the resulting {@link PCollection}.
*
* <p>If this value is not set, or set to {@code 1}, the reading will be
performed on a
* single worker.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]