gemini-code-assist[bot] commented on code in PR #38689:
URL: https://github.com/apache/beam/pull/38689#discussion_r3302152786


##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.kafka.streams.KafkaStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Result of executing a portable pipeline as a {@link KafkaStreams} 
application.
+ *
+ * <p>Translates the underlying {@link KafkaStreams.State} into Beam's {@link
+ * org.apache.beam.sdk.PipelineResult.State} and forwards {@link #cancel()} / 
{@link
+ * #waitUntilFinish()} to the {@code KafkaStreams} instance.
+ */
+class KafkaStreamsPortablePipelineResult implements PortablePipelineResult {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KafkaStreamsPortablePipelineResult.class);
+
+  private final KafkaStreams kafkaStreams;
+  private final CountDownLatch terminated = new CountDownLatch(1);
+
+  KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) {
+    this.kafkaStreams = kafkaStreams;
+    kafkaStreams.setStateListener(
+        (newState, oldState) -> {
+          if (newState == KafkaStreams.State.NOT_RUNNING || newState == 
KafkaStreams.State.ERROR) {
+            terminated.countDown();
+          }
+        });
+  }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If the `KafkaStreams` instance transitions to a terminal state 
(`NOT_RUNNING` or `ERROR`) before the state listener is registered, the 
`terminated` latch will never be counted down, which can cause 
`waitUntilFinish()` to block indefinitely. We should check the current state of 
`kafkaStreams` during construction and count down the latch if it is already in 
a terminal state.
   
   ```java
     KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) {
       this.kafkaStreams = kafkaStreams;
       kafkaStreams.setStateListener(
           (newState, oldState) -> {
             if (newState == KafkaStreams.State.NOT_RUNNING || newState == 
KafkaStreams.State.ERROR) {
               terminated.countDown();
             }
           });
       KafkaStreams.State currentState = kafkaStreams.state();
       if (currentState == KafkaStreams.State.NOT_RUNNING || currentState == 
KafkaStreams.State.ERROR) {
         terminated.countDown();
       }
     }
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.time.Duration;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streams {@link Processor} implementing Beam's {@code Impulse} 
transform.
+ *
+ * <p>Emits exactly one {@link WindowedValue} carrying an empty {@code byte[]} 
payload in the {@link
+ * org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with timestamp 
{@link
+ * BoundedWindow#TIMESTAMP_MIN_VALUE}. The emission happens once per task and 
is persisted in a
+ * state store keyed by the transform id so that task restarts do not re-emit.
+ *
+ * <p>The trigger comes from a wall-clock punctuator scheduled on {@link 
#init} — this lets the
+ * processor fire even when the dedicated bootstrap source topic is empty, 
which is the expected
+ * production state.
+ *
+ * <p><b>Watermark advancement to {@code TIMESTAMP_MAX_VALUE}</b> (design doc 
§4.1) is intentionally
+ * <em>not</em> performed here. Kafka Streams has no native Beam watermark; 
the output PCollection's
+ * watermark moves through the (future) runner-side watermark manager rather 
than through the {@link
+ * Record} timestamp. The forwarded Kafka Streams record carries a 
non-negative record timestamp
+ * ({@code 0L}) because KS rejects negative record timestamps; the Beam 
event-time lives inside the
+ * {@link WindowedValue}.
+ */
+class ImpulseProcessor implements Processor<byte[], byte[], byte[], 
WindowedValue<byte[]>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ImpulseProcessor.class);
+
+  /** Sole entry in the state store; the value tracks whether this processor 
has already emitted. */
+  static final String FIRED_KEY = "fired";
+
+  /** How soon after {@link #init} the punctuator first fires. */
+  private static final Duration PUNCTUATION_DELAY = Duration.ofMillis(50);
+
+  private final String stateStoreName;
+  private final String transformId;
+
+  private @Nullable ProcessorContext<byte[], WindowedValue<byte[]>> context;
+  private @Nullable KeyValueStore<String, Boolean> firedStore;
+
+  ImpulseProcessor(String stateStoreName, String transformId) {
+    this.stateStoreName = stateStoreName;
+    this.transformId = transformId;
+  }
+
+  @Override
+  public void init(ProcessorContext<byte[], WindowedValue<byte[]>> context) {
+    this.context = context;
+    this.firedStore = context.getStateStore(stateStoreName);
+    context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> 
maybeFire());
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The punctuator scheduled in `init` runs periodically forever. To avoid 
unnecessary CPU overhead and state store lookups after the impulse has already 
fired, we should capture the `Cancellable` returned by `context.schedule` and 
cancel it once the impulse is successfully emitted or if we detect it has 
already fired.
   
   ```suggestion
     private @Nullable ProcessorContext<byte[], WindowedValue<byte[]>> context;
     private @Nullable KeyValueStore<String, Boolean> firedStore;
     private @Nullable org.apache.kafka.streams.processor.Cancellable 
cancellable;
   
     ImpulseProcessor(String stateStoreName, String transformId) {
       this.stateStoreName = stateStoreName;
       this.transformId = transformId;
     }
   
     @Override
     public void init(ProcessorContext<byte[], WindowedValue<byte[]>> context) {
       this.context = context;
       this.firedStore = context.getStateStore(stateStoreName);
       this.cancellable = context.schedule(PUNCTUATION_DELAY, 
PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire());
     }
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.kafka.streams.KafkaStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Result of executing a portable pipeline as a {@link KafkaStreams} 
application.
+ *
+ * <p>Translates the underlying {@link KafkaStreams.State} into Beam's {@link
+ * org.apache.beam.sdk.PipelineResult.State} and forwards {@link #cancel()} / 
{@link
+ * #waitUntilFinish()} to the {@code KafkaStreams} instance.
+ */
+class KafkaStreamsPortablePipelineResult implements PortablePipelineResult {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KafkaStreamsPortablePipelineResult.class);
+
+  private final KafkaStreams kafkaStreams;
+  private final CountDownLatch terminated = new CountDownLatch(1);
+
+  KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) {
+    this.kafkaStreams = kafkaStreams;
+    kafkaStreams.setStateListener(
+        (newState, oldState) -> {
+          if (newState == KafkaStreams.State.NOT_RUNNING || newState == 
KafkaStreams.State.ERROR) {
+            terminated.countDown();
+          }
+        });
+  }
+
+  @Override
+  public State getState() {
+    return mapState(kafkaStreams.state());
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    kafkaStreams.close();
+    terminated.countDown();
+    return getState();
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   When `cancel()` is called, `kafkaStreams.close()` is invoked, which 
eventually transitions the state to `NOT_RUNNING`. Since `mapState` maps 
`NOT_RUNNING` to `State.DONE`, `getState()` will return `DONE` instead of 
`CANCELLED` once the shutdown completes. We should introduce a `volatile 
boolean cancelled` flag to ensure that once `cancel()` is called, `getState()` 
consistently returns `State.CANCELLED`.
   
   ```java
     private volatile boolean cancelled = false;
   
     @Override
     public State getState() {
       if (cancelled) {
         return State.CANCELLED;
       }
       return mapState(kafkaStreams.state());
     }
   
     @Override
     public State cancel() throws IOException {
       cancelled = true;
       kafkaStreams.close();
       terminated.countDown();
       return getState();
     }
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.time.Duration;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streams {@link Processor} implementing Beam's {@code Impulse} 
transform.
+ *
+ * <p>Emits exactly one {@link WindowedValue} carrying an empty {@code byte[]} 
payload in the {@link
+ * org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with timestamp 
{@link
+ * BoundedWindow#TIMESTAMP_MIN_VALUE}. The emission happens once per task and 
is persisted in a
+ * state store keyed by the transform id so that task restarts do not re-emit.
+ *
+ * <p>The trigger comes from a wall-clock punctuator scheduled on {@link 
#init} — this lets the
+ * processor fire even when the dedicated bootstrap source topic is empty, 
which is the expected
+ * production state.
+ *
+ * <p><b>Watermark advancement to {@code TIMESTAMP_MAX_VALUE}</b> (design doc 
§4.1) is intentionally
+ * <em>not</em> performed here. Kafka Streams has no native Beam watermark; 
the output PCollection's
+ * watermark moves through the (future) runner-side watermark manager rather 
than through the {@link
+ * Record} timestamp. The forwarded Kafka Streams record carries a 
non-negative record timestamp
+ * ({@code 0L}) because KS rejects negative record timestamps; the Beam 
event-time lives inside the
+ * {@link WindowedValue}.
+ */
+class ImpulseProcessor implements Processor<byte[], byte[], byte[], 
WindowedValue<byte[]>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ImpulseProcessor.class);
+
+  /** Sole entry in the state store; the value tracks whether this processor 
has already emitted. */
+  static final String FIRED_KEY = "fired";
+
+  /** How soon after {@link #init} the punctuator first fires. */
+  private static final Duration PUNCTUATION_DELAY = Duration.ofMillis(50);
+
+  private final String stateStoreName;
+  private final String transformId;
+
+  private @Nullable ProcessorContext<byte[], WindowedValue<byte[]>> context;
+  private @Nullable KeyValueStore<String, Boolean> firedStore;
+
+  ImpulseProcessor(String stateStoreName, String transformId) {
+    this.stateStoreName = stateStoreName;
+    this.transformId = transformId;
+  }
+
+  @Override
+  public void init(ProcessorContext<byte[], WindowedValue<byte[]>> context) {
+    this.context = context;
+    this.firedStore = context.getStateStore(stateStoreName);
+    context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> 
maybeFire());
+  }
+
+  @Override
+  public void process(Record<byte[], byte[]> record) {
+    // Records that happen to land on the bootstrap topic are not actual data; 
they just provide an
+    // extra opportunity to fire the impulse on restart. The state store still 
gates the emit.
+    maybeFire();
+  }
+
+  private void maybeFire() {
+    ProcessorContext<byte[], WindowedValue<byte[]>> ctx = context;
+    KeyValueStore<String, Boolean> store = firedStore;
+    if (ctx == null || store == null) {
+      return;
+    }
+    if (Boolean.TRUE.equals(store.get(FIRED_KEY))) {
+      return;
+    }
+    WindowedValue<byte[]> impulse = WindowedValues.valueInGlobalWindow(new 
byte[0]);
+    // The output PCollection is not keyed (PCollection<byte[]>); use an empty 
byte[] as a
+    // placeholder key so downstream processors that adopt the byte[]-key 
convention see a
+    // consistent shape.
+    //
+    // Kafka Streams disallows negative record timestamps, so the Record 
carries the Unix epoch
+    // (0L). The Beam event-time, BoundedWindow.TIMESTAMP_MIN_VALUE, lives 
inside the forwarded
+    // WindowedValue and is what downstream Beam logic must consult.
+    ctx.forward(new Record<byte[], WindowedValue<byte[]>>(new byte[0], 
impulse, 0L));
+    store.put(FIRED_KEY, Boolean.TRUE);
+    LOG.debug("Impulse {} emitted single element", transformId);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Cancel the scheduled punctuator once the impulse has fired or if it is 
detected as already fired to prevent periodic wakeups and state store lookups.
   
   ```java
     private void maybeFire() {
       ProcessorContext<byte[], WindowedValue<byte[]>> ctx = context;
       KeyValueStore<String, Boolean> store = firedStore;
       if (ctx == null || store == null) {
         return;
       }
       if (Boolean.TRUE.equals(store.get(FIRED_KEY))) {
         if (cancellable != null) {
           cancellable.cancel();
           cancellable = null;
         }
         return;
       }
       WindowedValue<byte[]> impulse = WindowedValues.valueInGlobalWindow(new 
byte[0]);
       // The output PCollection is not keyed (PCollection<byte[]>); use an 
empty byte[] as a
       // placeholder key so downstream processors that adopt the byte[]-key 
convention see a
       // consistent shape.
       //
       // Kafka Streams disallows negative record timestamps, so the Record 
carries the Unix epoch
       // (0L). The Beam event-time, BoundedWindow.TIMESTAMP_MIN_VALUE, lives 
inside the forwarded
       // WindowedValue and is what downstream Beam logic must consult.
       ctx.forward(new Record<byte[], WindowedValue<byte[]>>(new byte[0], 
impulse, 0L));
       store.put(FIRED_KEY, Boolean.TRUE);
       if (cancellable != null) {
         cancellable.cancel();
         cancellable = null;
       }
       LOG.debug("Impulse {} emitted single element", transformId);
     }
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java:
##########
@@ -17,29 +17,56 @@
  */
 package org.apache.beam.runners.kafka.streams;
 
+import java.util.Properties;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
 import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
 import 
org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator;
 import 
org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/** Executes a portable pipeline by translating it to Kafka Streams. */
+/** Executes a portable pipeline by translating it to a Kafka Streams {@link 
Topology}. */
 public class KafkaStreamsPipelineRunner implements PortablePipelineRunner {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaStreamsPipelineRunner.class);
+
   private final KafkaStreamsPipelineOptions pipelineOptions;
 
   public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions 
pipelineOptions) {
     this.pipelineOptions = pipelineOptions;
   }
 
   @Override
-  public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo 
jobInfo) throws Exception {
+  public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo 
jobInfo) {
     KafkaStreamsPipelineTranslator translator = new 
KafkaStreamsPipelineTranslator();
     KafkaStreamsTranslationContext context =
         translator.createTranslationContext(jobInfo, pipelineOptions);
     RunnerApi.Pipeline prepared = translator.prepareForTranslation(pipeline);
     translator.translate(context, prepared);
-    throw new IllegalStateException("Translation unexpectedly completed 
without an executor");
+
+    Topology topology = context.getTopology();
+    LOG.info(
+        "Translated pipeline {} into Kafka Streams topology:\n{}",
+        jobInfo.jobId(),
+        topology.describe());
+
+    KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfig(jobInfo));
+    kafkaStreams.start();
+    return new KafkaStreamsPortablePipelineResult(kafkaStreams);
+  }
+
+  private Properties streamsConfig(JobInfo jobInfo) {
+    Properties props = new Properties();
+    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
pipelineOptions.getBootstrapServers());
+    props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
pipelineOptions.getApplicationId());
+    props.put(StreamsConfig.STATE_DIR_CONFIG, pipelineOptions.getStateDir());
+    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
+    props.put(StreamsConfig.CLIENT_ID_CONFIG, jobInfo.jobId());
+    return props;
   }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Since `Properties` extends `Hashtable`, calling `put` with a `null` value 
will throw a `NullPointerException`. If any of the pipeline options (like 
`bootstrapServers`, `applicationId`, or `stateDir`) are not configured and 
return `null`, this method will fail with a raw NPE. We should check for null 
before putting them into the properties map.
   
   ```java
     private Properties streamsConfig(JobInfo jobInfo) {
       Properties props = new Properties();
       if (pipelineOptions.getBootstrapServers() != null) {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
pipelineOptions.getBootstrapServers());
       }
       if (pipelineOptions.getApplicationId() != null) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
pipelineOptions.getApplicationId());
       }
       if (pipelineOptions.getStateDir() != null) {
         props.put(StreamsConfig.STATE_DIR_CONFIG, 
pipelineOptions.getStateDir());
       }
       props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
       if (jobInfo.jobId() != null) {
         props.put(StreamsConfig.CLIENT_ID_CONFIG, jobInfo.jobId());
       }
       return props;
     }
   ```



##########
runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.junit.Test;
+
+/**
+ * Behavioural tests for {@link ImpulseTranslator} using {@link 
TopologyTestDriver}.
+ *
+ * <p>The translator builds a topology with a real source + processor pair. 
The tests sit a {@link
+ * CapturingProcessor} downstream so emitted {@code WindowedValue<byte[]>} 
elements can be inspected
+ * directly without going through a Kafka sink topic (the runner does not 
produce one because no
+ * downstream PCollections exist yet).
+ */
+public class ImpulseTranslatorTest {
+
+  @Test
+  public void impulseEmitsExactlyOneEmptyByteArrayInGlobalWindow() {
+    KafkaStreamsTranslationContext context = 
KafkaStreamsPipelineTranslatorTest.newContext();
+    new KafkaStreamsPipelineTranslator()
+        .translate(context, 
KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline());
+
+    CapturingProcessor capture = new CapturingProcessor();
+    Topology topology = context.getTopology();
+    topology.addProcessor("capture", capture, "impulse");
+
+    try (TopologyTestDriver driver = new TopologyTestDriver(topology, 
baseProps())) {
+      driver.advanceWallClockTime(Duration.ofSeconds(1));
+      driver.advanceWallClockTime(Duration.ofSeconds(1));
+    }
+
+    assertThat(capture.received.size(), is(1));
+    WindowedValue<byte[]> only = capture.received.get(0);
+    assertThat(only, is(notNullValue()));
+    assertThat(only.getValue().length, is(0));
+    assertThat(only.getWindows().size(), is(1));
+    assertThat(only.getTimestamp().getMillis(), 
is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
+  }
+
+  @Test
+  public void impulseDoesNotReEmitOnRestart() {
+    KafkaStreamsTranslationContext context = 
KafkaStreamsPipelineTranslatorTest.newContext();
+    new KafkaStreamsPipelineTranslator()
+        .translate(context, 
KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline());
+
+    CapturingProcessor capture = new CapturingProcessor();
+    Topology topology = context.getTopology();
+    topology.addProcessor("capture", capture, "impulse");
+
+    try (TopologyTestDriver driver = new TopologyTestDriver(topology, 
baseProps())) {
+      driver.advanceWallClockTime(Duration.ofSeconds(1));
+      // Trigger again — should be ignored because the state store flag is set.
+      driver.advanceWallClockTime(Duration.ofSeconds(5));
+    }
+
+    assertThat(capture.received.size(), is(1));
+  }
+
+  private static Properties baseProps() {
+    Properties props = new Properties();
+    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-translator-test");
+    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(
+        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass().getName());
+    props.put(
+        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass().getName());
+    return props;
+  }
+
+  /**
+   * Captures {@link WindowedValue} records forwarded by {@link 
ImpulseProcessor}. The supplier
+   * returns a fresh forwarder each call (required by Kafka Streams) but all 
forwarders write into
+   * the shared {@link #received} list so the test can read the captured 
elements after the topology
+   * is closed.
+   */
+  private static class CapturingProcessor
+      implements ProcessorSupplier<byte[], WindowedValue<byte[]>, byte[], 
WindowedValue<byte[]>> {
+
+    final List<WindowedValue<byte[]>> received = new ArrayList<>();
+

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `received` list in `CapturingProcessor` is an `ArrayList` which is not 
thread-safe. Although `TopologyTestDriver` currently runs single-threaded, it 
is a best practice to use a thread-safe collection (like `CopyOnWriteArrayList` 
or `Collections.synchronizedList`) to prevent concurrency issues if the test or 
topology is ever executed in a multi-threaded context.
   
   ```suggestion
     private static class CapturingProcessor
         implements ProcessorSupplier<byte[], WindowedValue<byte[]>, byte[], 
WindowedValue<byte[]>> {
   
       final List<WindowedValue<byte[]>> received = 
java.util.Collections.synchronizedList(new ArrayList<>());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to