gemini-code-assist[bot] commented on code in PR #38689: URL: https://github.com/apache/beam/pull/38689#discussion_r3302152786
########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.kafka.streams.KafkaStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Result of executing a portable pipeline as a {@link KafkaStreams} application. + * + * <p>Translates the underlying {@link KafkaStreams.State} into Beam's {@link + * org.apache.beam.sdk.PipelineResult.State} and forwards {@link #cancel()} / {@link + * #waitUntilFinish()} to the {@code KafkaStreams} instance. + */ +class KafkaStreamsPortablePipelineResult implements PortablePipelineResult { + + private static final Logger LOG = + LoggerFactory.getLogger(KafkaStreamsPortablePipelineResult.class); + + private final KafkaStreams kafkaStreams; + private final CountDownLatch terminated = new CountDownLatch(1); + + KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) { + this.kafkaStreams = kafkaStreams; + kafkaStreams.setStateListener( + (newState, oldState) -> { + if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) { + terminated.countDown(); + } + }); + } Review Comment:  If the `KafkaStreams` instance transitions to a terminal state (`NOT_RUNNING` or `ERROR`) before the state listener is registered, the `terminated` latch will never be counted down, which can cause `waitUntilFinish()` to block indefinitely. We should check the current state of `kafkaStreams` during construction and count down the latch if it is already in a terminal state. ```java KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) { this.kafkaStreams = kafkaStreams; kafkaStreams.setStateListener( (newState, oldState) -> { if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) { terminated.countDown(); } }); KafkaStreams.State currentState = kafkaStreams.state(); if (currentState == KafkaStreams.State.NOT_RUNNING || currentState == KafkaStreams.State.ERROR) { terminated.countDown(); } } ``` ########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.time.Duration; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kafka Streams {@link Processor} implementing Beam's {@code Impulse} transform. + * + * <p>Emits exactly one {@link WindowedValue} carrying an empty {@code byte[]} payload in the {@link + * org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with timestamp {@link + * BoundedWindow#TIMESTAMP_MIN_VALUE}. The emission happens once per task and is persisted in a + * state store keyed by the transform id so that task restarts do not re-emit. + * + * <p>The trigger comes from a wall-clock punctuator scheduled on {@link #init} — this lets the + * processor fire even when the dedicated bootstrap source topic is empty, which is the expected + * production state. + * + * <p><b>Watermark advancement to {@code TIMESTAMP_MAX_VALUE}</b> (design doc §4.1) is intentionally + * <em>not</em> performed here. Kafka Streams has no native Beam watermark; the output PCollection's + * watermark moves through the (future) runner-side watermark manager rather than through the {@link + * Record} timestamp. The forwarded Kafka Streams record carries a non-negative record timestamp + * ({@code 0L}) because KS rejects negative record timestamps; the Beam event-time lives inside the + * {@link WindowedValue}. + */ +class ImpulseProcessor implements Processor<byte[], byte[], byte[], WindowedValue<byte[]>> { + + private static final Logger LOG = LoggerFactory.getLogger(ImpulseProcessor.class); + + /** Sole entry in the state store; the value tracks whether this processor has already emitted. */ + static final String FIRED_KEY = "fired"; + + /** How soon after {@link #init} the punctuator first fires. */ + private static final Duration PUNCTUATION_DELAY = Duration.ofMillis(50); + + private final String stateStoreName; + private final String transformId; + + private @Nullable ProcessorContext<byte[], WindowedValue<byte[]>> context; + private @Nullable KeyValueStore<String, Boolean> firedStore; + + ImpulseProcessor(String stateStoreName, String transformId) { + this.stateStoreName = stateStoreName; + this.transformId = transformId; + } + + @Override + public void init(ProcessorContext<byte[], WindowedValue<byte[]>> context) { + this.context = context; + this.firedStore = context.getStateStore(stateStoreName); + context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire()); + } Review Comment:  The punctuator scheduled in `init` runs periodically forever. To avoid unnecessary CPU overhead and state store lookups after the impulse has already fired, we should capture the `Cancellable` returned by `context.schedule` and cancel it once the impulse is successfully emitted or if we detect it has already fired. ```suggestion private @Nullable ProcessorContext<byte[], WindowedValue<byte[]>> context; private @Nullable KeyValueStore<String, Boolean> firedStore; private @Nullable org.apache.kafka.streams.processor.Cancellable cancellable; ImpulseProcessor(String stateStoreName, String transformId) { this.stateStoreName = stateStoreName; this.transformId = transformId; } @Override public void init(ProcessorContext<byte[], WindowedValue<byte[]>> context) { this.context = context; this.firedStore = context.getStateStore(stateStoreName); this.cancellable = context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire()); } ``` ########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.kafka.streams.KafkaStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Result of executing a portable pipeline as a {@link KafkaStreams} application. + * + * <p>Translates the underlying {@link KafkaStreams.State} into Beam's {@link + * org.apache.beam.sdk.PipelineResult.State} and forwards {@link #cancel()} / {@link + * #waitUntilFinish()} to the {@code KafkaStreams} instance. + */ +class KafkaStreamsPortablePipelineResult implements PortablePipelineResult { + + private static final Logger LOG = + LoggerFactory.getLogger(KafkaStreamsPortablePipelineResult.class); + + private final KafkaStreams kafkaStreams; + private final CountDownLatch terminated = new CountDownLatch(1); + + KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) { + this.kafkaStreams = kafkaStreams; + kafkaStreams.setStateListener( + (newState, oldState) -> { + if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) { + terminated.countDown(); + } + }); + } + + @Override + public State getState() { + return mapState(kafkaStreams.state()); + } + + @Override + public State cancel() throws IOException { + kafkaStreams.close(); + terminated.countDown(); + return getState(); + } Review Comment:  When `cancel()` is called, `kafkaStreams.close()` is invoked, which eventually transitions the state to `NOT_RUNNING`. Since `mapState` maps `NOT_RUNNING` to `State.DONE`, `getState()` will return `DONE` instead of `CANCELLED` once the shutdown completes. We should introduce a `volatile boolean cancelled` flag to ensure that once `cancel()` is called, `getState()` consistently returns `State.CANCELLED`. ```java private volatile boolean cancelled = false; @Override public State getState() { if (cancelled) { return State.CANCELLED; } return mapState(kafkaStreams.state()); } @Override public State cancel() throws IOException { cancelled = true; kafkaStreams.close(); terminated.countDown(); return getState(); } ``` ########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.time.Duration; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kafka Streams {@link Processor} implementing Beam's {@code Impulse} transform. + * + * <p>Emits exactly one {@link WindowedValue} carrying an empty {@code byte[]} payload in the {@link + * org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with timestamp {@link + * BoundedWindow#TIMESTAMP_MIN_VALUE}. The emission happens once per task and is persisted in a + * state store keyed by the transform id so that task restarts do not re-emit. + * + * <p>The trigger comes from a wall-clock punctuator scheduled on {@link #init} — this lets the + * processor fire even when the dedicated bootstrap source topic is empty, which is the expected + * production state. + * + * <p><b>Watermark advancement to {@code TIMESTAMP_MAX_VALUE}</b> (design doc §4.1) is intentionally + * <em>not</em> performed here. Kafka Streams has no native Beam watermark; the output PCollection's + * watermark moves through the (future) runner-side watermark manager rather than through the {@link + * Record} timestamp. The forwarded Kafka Streams record carries a non-negative record timestamp + * ({@code 0L}) because KS rejects negative record timestamps; the Beam event-time lives inside the + * {@link WindowedValue}. + */ +class ImpulseProcessor implements Processor<byte[], byte[], byte[], WindowedValue<byte[]>> { + + private static final Logger LOG = LoggerFactory.getLogger(ImpulseProcessor.class); + + /** Sole entry in the state store; the value tracks whether this processor has already emitted. */ + static final String FIRED_KEY = "fired"; + + /** How soon after {@link #init} the punctuator first fires. */ + private static final Duration PUNCTUATION_DELAY = Duration.ofMillis(50); + + private final String stateStoreName; + private final String transformId; + + private @Nullable ProcessorContext<byte[], WindowedValue<byte[]>> context; + private @Nullable KeyValueStore<String, Boolean> firedStore; + + ImpulseProcessor(String stateStoreName, String transformId) { + this.stateStoreName = stateStoreName; + this.transformId = transformId; + } + + @Override + public void init(ProcessorContext<byte[], WindowedValue<byte[]>> context) { + this.context = context; + this.firedStore = context.getStateStore(stateStoreName); + context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire()); + } + + @Override + public void process(Record<byte[], byte[]> record) { + // Records that happen to land on the bootstrap topic are not actual data; they just provide an + // extra opportunity to fire the impulse on restart. The state store still gates the emit. + maybeFire(); + } + + private void maybeFire() { + ProcessorContext<byte[], WindowedValue<byte[]>> ctx = context; + KeyValueStore<String, Boolean> store = firedStore; + if (ctx == null || store == null) { + return; + } + if (Boolean.TRUE.equals(store.get(FIRED_KEY))) { + return; + } + WindowedValue<byte[]> impulse = WindowedValues.valueInGlobalWindow(new byte[0]); + // The output PCollection is not keyed (PCollection<byte[]>); use an empty byte[] as a + // placeholder key so downstream processors that adopt the byte[]-key convention see a + // consistent shape. + // + // Kafka Streams disallows negative record timestamps, so the Record carries the Unix epoch + // (0L). The Beam event-time, BoundedWindow.TIMESTAMP_MIN_VALUE, lives inside the forwarded + // WindowedValue and is what downstream Beam logic must consult. + ctx.forward(new Record<byte[], WindowedValue<byte[]>>(new byte[0], impulse, 0L)); + store.put(FIRED_KEY, Boolean.TRUE); + LOG.debug("Impulse {} emitted single element", transformId); + } Review Comment:  Cancel the scheduled punctuator once the impulse has fired or if it is detected as already fired to prevent periodic wakeups and state store lookups. ```java private void maybeFire() { ProcessorContext<byte[], WindowedValue<byte[]>> ctx = context; KeyValueStore<String, Boolean> store = firedStore; if (ctx == null || store == null) { return; } if (Boolean.TRUE.equals(store.get(FIRED_KEY))) { if (cancellable != null) { cancellable.cancel(); cancellable = null; } return; } WindowedValue<byte[]> impulse = WindowedValues.valueInGlobalWindow(new byte[0]); // The output PCollection is not keyed (PCollection<byte[]>); use an empty byte[] as a // placeholder key so downstream processors that adopt the byte[]-key convention see a // consistent shape. // // Kafka Streams disallows negative record timestamps, so the Record carries the Unix epoch // (0L). The Beam event-time, BoundedWindow.TIMESTAMP_MIN_VALUE, lives inside the forwarded // WindowedValue and is what downstream Beam logic must consult. ctx.forward(new Record<byte[], WindowedValue<byte[]>>(new byte[0], impulse, 0L)); store.put(FIRED_KEY, Boolean.TRUE); if (cancellable != null) { cancellable.cancel(); cancellable = null; } LOG.debug("Impulse {} emitted single element", transformId); } ``` ########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java: ########## @@ -17,29 +17,56 @@ */ package org.apache.beam.runners.kafka.streams; +import java.util.Properties; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** Executes a portable pipeline by translating it to Kafka Streams. */ +/** Executes a portable pipeline by translating it to a Kafka Streams {@link Topology}. */ public class KafkaStreamsPipelineRunner implements PortablePipelineRunner { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipelineRunner.class); + private final KafkaStreamsPipelineOptions pipelineOptions; public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) { this.pipelineOptions = pipelineOptions; } @Override - public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception { + public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, pipelineOptions); RunnerApi.Pipeline prepared = translator.prepareForTranslation(pipeline); translator.translate(context, prepared); - throw new IllegalStateException("Translation unexpectedly completed without an executor"); + + Topology topology = context.getTopology(); + LOG.info( + "Translated pipeline {} into Kafka Streams topology:\n{}", + jobInfo.jobId(), + topology.describe()); + + KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig(jobInfo)); + kafkaStreams.start(); + return new KafkaStreamsPortablePipelineResult(kafkaStreams); + } + + private Properties streamsConfig(JobInfo jobInfo) { + Properties props = new Properties(); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, pipelineOptions.getBootstrapServers()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, pipelineOptions.getApplicationId()); + props.put(StreamsConfig.STATE_DIR_CONFIG, pipelineOptions.getStateDir()); + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + props.put(StreamsConfig.CLIENT_ID_CONFIG, jobInfo.jobId()); + return props; } Review Comment:  Since `Properties` extends `Hashtable`, calling `put` with a `null` value will throw a `NullPointerException`. If any of the pipeline options (like `bootstrapServers`, `applicationId`, or `stateDir`) are not configured and return `null`, this method will fail with a raw NPE. We should check for null before putting them into the properties map. ```java private Properties streamsConfig(JobInfo jobInfo) { Properties props = new Properties(); if (pipelineOptions.getBootstrapServers() != null) { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, pipelineOptions.getBootstrapServers()); } if (pipelineOptions.getApplicationId() != null) { props.put(StreamsConfig.APPLICATION_ID_CONFIG, pipelineOptions.getApplicationId()); } if (pipelineOptions.getStateDir() != null) { props.put(StreamsConfig.STATE_DIR_CONFIG, pipelineOptions.getStateDir()); } props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); if (jobInfo.jobId() != null) { props.put(StreamsConfig.CLIENT_ID_CONFIG, jobInfo.jobId()); } return props; } ``` ########## runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +/** + * Behavioural tests for {@link ImpulseTranslator} using {@link TopologyTestDriver}. + * + * <p>The translator builds a topology with a real source + processor pair. The tests sit a {@link + * CapturingProcessor} downstream so emitted {@code WindowedValue<byte[]>} elements can be inspected + * directly without going through a Kafka sink topic (the runner does not produce one because no + * downstream PCollections exist yet). + */ +public class ImpulseTranslatorTest { + + @Test + public void impulseEmitsExactlyOneEmptyByteArrayInGlobalWindow() { + KafkaStreamsTranslationContext context = KafkaStreamsPipelineTranslatorTest.newContext(); + new KafkaStreamsPipelineTranslator() + .translate(context, KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline()); + + CapturingProcessor capture = new CapturingProcessor(); + Topology topology = context.getTopology(); + topology.addProcessor("capture", capture, "impulse"); + + try (TopologyTestDriver driver = new TopologyTestDriver(topology, baseProps())) { + driver.advanceWallClockTime(Duration.ofSeconds(1)); + driver.advanceWallClockTime(Duration.ofSeconds(1)); + } + + assertThat(capture.received.size(), is(1)); + WindowedValue<byte[]> only = capture.received.get(0); + assertThat(only, is(notNullValue())); + assertThat(only.getValue().length, is(0)); + assertThat(only.getWindows().size(), is(1)); + assertThat(only.getTimestamp().getMillis(), is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())); + } + + @Test + public void impulseDoesNotReEmitOnRestart() { + KafkaStreamsTranslationContext context = KafkaStreamsPipelineTranslatorTest.newContext(); + new KafkaStreamsPipelineTranslator() + .translate(context, KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline()); + + CapturingProcessor capture = new CapturingProcessor(); + Topology topology = context.getTopology(); + topology.addProcessor("capture", capture, "impulse"); + + try (TopologyTestDriver driver = new TopologyTestDriver(topology, baseProps())) { + driver.advanceWallClockTime(Duration.ofSeconds(1)); + // Trigger again — should be ignored because the state store flag is set. + driver.advanceWallClockTime(Duration.ofSeconds(5)); + } + + assertThat(capture.received.size(), is(1)); + } + + private static Properties baseProps() { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-translator-test"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + props.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + return props; + } + + /** + * Captures {@link WindowedValue} records forwarded by {@link ImpulseProcessor}. The supplier + * returns a fresh forwarder each call (required by Kafka Streams) but all forwarders write into + * the shared {@link #received} list so the test can read the captured elements after the topology + * is closed. + */ + private static class CapturingProcessor + implements ProcessorSupplier<byte[], WindowedValue<byte[]>, byte[], WindowedValue<byte[]>> { + + final List<WindowedValue<byte[]>> received = new ArrayList<>(); + Review Comment:  The `received` list in `CapturingProcessor` is an `ArrayList` which is not thread-safe. Although `TopologyTestDriver` currently runs single-threaded, it is a best practice to use a thread-safe collection (like `CopyOnWriteArrayList` or `Collections.synchronizedList`) to prevent concurrency issues if the test or topology is ever executed in a multi-threaded context. ```suggestion private static class CapturingProcessor implements ProcessorSupplier<byte[], WindowedValue<byte[]>, byte[], WindowedValue<byte[]>> { final List<WindowedValue<byte[]>> received = java.util.Collections.synchronizedList(new ArrayList<>()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
