kennknowles commented on code in PR #37411:
URL: https://github.com/apache/beam/pull/37411#discussion_r2817972711
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -99,6 +103,8 @@ public abstract static class Read<V> extends
PTransform<PBegin, PCollection<V>>
abstract @Nullable Long getStartOffset();
+ abstract @Nullable Integer getNumReaders();
Review Comment:
What is the intended behavior if this is `null`? The user should also be
able to restore this default behavior with `setNumReaders(null)`, so the getter
and setter should match. (I notice this is not the case for start offset, but
that is a bug).
However, I think you could potentially remove nullability from all of them
by setting defaults in the factory method. This is often the best choice.
##########
sdks/java/io/sparkreceiver/3/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOTest.java:
##########
@@ -241,4 +241,32 @@ public void testReadFromReceiverIteratorData() {
PAssert.that(actual).containsInAnyOrder(expected);
pipeline.run().waitUntilFinish(Duration.standardSeconds(15));
}
+
+ @Test
+ public void testReadFromCustomReceiverWithParallelism() {
+ CustomReceiverWithOffset.shouldFailInTheMiddle = false;
+ ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+ new
ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+ SparkReceiverIO.Read<String> reader =
+ SparkReceiverIO.<String>read()
+ .withGetOffsetFn(Long::valueOf)
+ .withTimestampFn(Instant::parse)
+ .withPullFrequencySec(PULL_FREQUENCY_SEC)
+ .withStartPollTimeoutSec(START_POLL_TIMEOUT_SEC)
+ .withStartOffset(START_OFFSET)
+ .withSparkReceiverBuilder(receiverBuilder)
+ .withNumReaders(3);
+
+ List<String> expected = new ArrayList<>();
+ // With sharding enabled in CustomReceiverWithOffset, the total records
read
+ // across all workers
+ // should be exactly the set of 0..RECORDS_COUNT-1, each read exactly once.
+ for (int i = 0; i < CustomReceiverWithOffset.RECORDS_COUNT; i++) {
+ expected.add(String.valueOf(i));
+ }
+ PCollection<String> actual =
pipeline.apply(reader).setCoder(StringUtf8Coder.of());
Review Comment:
`setCoder` shouldn't be needed, as we should infer string coder
automatically. If this isn't happening, let's look at why it is not happening.
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -284,6 +290,9 @@ public ProcessContinuation processElement(
}
LOG.debug("Restriction {}", tracker.currentRestriction().toString());
sparkConsumer = new
SparkConsumerWithOffset<>(tracker.currentRestriction().getFrom());
+ if (sparkReceiver instanceof HasOffset) {
+ ((HasOffset) sparkReceiver).setShard(element, numReaders);
Review Comment:
I don't know this code, really, but the javadoc of the class suggests that
it _only_ supports receivers that implement `HasOffset`. So this check should
not be necessary, or it should be moved earlier in the DoFn instantiation, I
think.
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -191,10 +209,20 @@ public PCollection<V> expand(PBegin input) {
sparkReceiverBuilder.getSparkReceiverClass().getName()));
} else {
LOG.info("{} started reading",
ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
- return input
- .apply(Impulse.create())
- .apply(ParDo.of(new
ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
- // TODO: Split data from SparkReceiver into multiple workers
+ Integer numReadersObj = sparkReceiverRead.getNumReaders();
+ if (numReadersObj == null || numReadersObj == 1) {
+ return input
+ .apply(Create.of(0))
+ .apply(ParDo.of(new
ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
+ } else {
+ int numReaders = numReadersObj;
+ List<Integer> shards =
+ IntStream.range(0,
numReaders).boxed().collect(Collectors.toList());
+ return input
+ .apply(Create.of(shards))
+ .apply(Reshuffle.viaRandomKey())
Review Comment:
Typically, use `Redistribute`, which is a new synonym for Reshuffle but more
clearly indicates that it is _only_ for redistributing data and not any of the
other things that Reshuffle implies. (some uses of Reshuffle are for
checkpointing, etc).
But I think it is not necessary to do anything here - the runner already
knows that a splittable DoFn has large output per element, and saves its
checkpoint state, so redistribution should happen automatically. (not sure if
SparkRunner does this right or not)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]