ATHARVA262005 commented on code in PR #37411:
URL: https://github.com/apache/beam/pull/37411#discussion_r2724606860
##########
sdks/java/io/sparkreceiver/3/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -191,10 +211,29 @@ public PCollection<V> expand(PBegin input) {
sparkReceiverBuilder.getSparkReceiverClass().getName()));
} else {
LOG.info("{} started reading",
ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
- return input
- .apply(Impulse.create())
- .apply(ParDo.of(new
ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
- // TODO: Split data from SparkReceiver into multiple workers
+ Integer numReadersObj = sparkReceiverRead.getNumReaders();
+ if (numReadersObj == null || numReadersObj == 1) {
+ return input
+ .apply(Impulse.create())
+ .apply(
+ MapElements.into(TypeDescriptors.integers())
+ .via(
+ new SerializableFunction<byte[], Integer>() {
+ @Override
+ public Integer apply(byte[] input) {
+ return 0;
+ }
+ }))
Review Comment:
Fixed. I removed the `Impulse` + `MapElements` chain and replaced it with
`Create.of(0)` as suggested. This is cleaner and consistent with the
multi-reader path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]