je-ik commented on code in PR #38987:
URL: https://github.com/apache/beam/pull/38987#discussion_r3430518805
##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java:
##########
@@ -54,21 +56,36 @@ private enum Kind {
private final Kind kind;
private final @Nullable WindowedValue<T> data;
private final long watermarkMillis;
+ private final int sourcePartition;
+ private final int totalSourcePartitions;
- private KStreamsPayload(Kind kind, @Nullable WindowedValue<T> data, long
watermarkMillis) {
+ private KStreamsPayload(
+ Kind kind,
+ @Nullable WindowedValue<T> data,
+ long watermarkMillis,
+ int sourcePartition,
+ int totalSourcePartitions) {
this.kind = kind;
this.data = data;
this.watermarkMillis = watermarkMillis;
+ this.sourcePartition = sourcePartition;
+ this.totalSourcePartitions = totalSourcePartitions;
}
/** Returns a data payload wrapping the given {@link WindowedValue}. */
public static <T> KStreamsPayload<T> data(WindowedValue<T> value) {
- return new KStreamsPayload<>(Kind.DATA, value, 0L);
+ return new KStreamsPayload<>(Kind.DATA, value, 0L, 0, 0);
}
- /** Returns a watermark payload carrying the given event-time milliseconds.
*/
- public static <T> KStreamsPayload<T> watermark(long watermarkMillis) {
- return new KStreamsPayload<>(Kind.WATERMARK, null, watermarkMillis);
+ /**
+ * Returns a watermark report payload: the event-time milliseconds together
with the in-band
+ * coordination fields the downstream stage's {@link WatermarkManager} needs
— which source
+ * partition this report is for and how many source partitions feed the
stage in total.
+ */
+ public static <T> KStreamsPayload<T> watermark(
+ long watermarkMillis, int sourcePartition, int totalSourcePartitions) {
+ return new KStreamsPayload<>(
+ Kind.WATERMARK, null, watermarkMillis, sourcePartition,
totalSourcePartitions);
Review Comment:
I'd use Preconditions.checkArgument() for that.
##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java:
##########
@@ -101,6 +118,29 @@ public long getWatermarkMillis() {
return watermarkMillis;
}
+ /**
+ * Returns the source partition this watermark report is for. Caller must
check {@link
+ * #isWatermark()} first; calling this on a data payload throws.
+ */
+ public int getSourcePartition() {
+ if (kind != Kind.WATERMARK) {
Review Comment:
The idiomatic approach would be to extract interface WatermarkPayload from
the KStreamsPayload and keep the kind == Kind.WATERMARK methods only on that
interface.
It would then be cast like
```
KStreamsPayload payload = ...;
if (payload.isWatermark()) {
WatermarkPayload watermark = payload.asWatermark()l;
watermark.getSourcePartition(); // specific methods.
}
##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java:
##########
@@ -87,22 +98,43 @@ class ExecutableStageProcessor
@Override
public void init(ProcessorContext<byte[], KStreamsPayload<?>> context) {
this.context = context;
- ExecutableStage executableStage =
ExecutableStage.fromPayload(stagePayload);
- this.stageContext =
KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo);
- this.stageBundleFactory =
stageContext.getStageBundleFactory(executableStage);
+ // The SDK harness (stage context + bundle factory) is created lazily on
the first data
+ // element, so a stage that only forwards watermarks never spins one up.
This mirrors Spark's
+ // SparkExecutableStageFunction, which likewise does not build a bundle
factory when there are
+ // no inputs to process.
+ }
+
+ private StageBundleFactory ensureStageBundleFactory() {
+ StageBundleFactory factory = stageBundleFactory;
+ if (factory == null) {
+ ExecutableStage executableStage =
ExecutableStage.fromPayload(stagePayload);
+ ExecutableStageContext sc =
Review Comment:
Are the variables `factory` and `sc` necessary? Could we use directly the
target fields?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]