gemini-code-assist[bot] commented on code in PR #38843: URL: https://github.com/apache/beam/pull/38843#discussion_r3372267621
########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; + +/** + * Runner-native translator for {@code beam:transform:redistribute_arbitrarily:v1}. + * + * <p>The default Beam expansion of {@code Redistribute.arbitrarily()} goes through {@code + * GroupByKey} for materialization. The Kafka Streams runner does not need to do any actual + * redistribution in the single-instance topology — Kafka Streams is already handling per-task + * processing — so we provide a passthrough: the output PCollection is mapped to the same processor + * that already produces the input PCollection. No topology node is added. + * + * <p>The translator is registered in {@link KafkaStreamsPipelineTranslator#knownUrns()} so {@link + * org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander} strips the + * sub-transforms of {@code Redistribute} before the fuser runs. That keeps the Redistribute + * boundary intact and lets the fuser split adjacent stages correctly. + * + * <p>The keyed variant ({@code redistribute_by_key:v1}) is intentionally not handled here: it + * implies a rehash/partition step that the runner can implement on top of GroupByKey when that + * lands, but cannot reasonably no-op the way the arbitrary variant can. + */ +class RedistributeTranslator implements PTransformTranslator { + + @Override + public void translate( + String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { + RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); + String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); + String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); + String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); Review Comment:  Add a defensive null check for `parentProcessor`. If the input PCollection does not have a registered producer processor (e.g., due to an unexpected pipeline structure or translation order issue), this will fail fast with a clear error message instead of causing a downstream `NullPointerException`. ```java String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); if (parentProcessor == null) { throw new IllegalStateException( String.format( "No producer processor found for input PCollection %s of transform %s", inputPCollectionId, transformId)); } String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); ``` ########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java: ########## @@ -157,26 +161,25 @@ private void closeBundleAndFlush(Record<byte[], KStreamsPayload<byte[]>> record) } finally { currentBundle = null; } - ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = checkInitialized(context); + ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context); // The harness has finished the bundle (close() returned) so no further enqueues happen. // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to drain via forEach. pendingOutputs.forEach( output -> ctx.forward( - new Record<byte[], KStreamsPayload<byte[]>>( + new Record<byte[], KStreamsPayload<?>>( record.key(), KStreamsPayload.data(output), record.timestamp()))); pendingOutputs.clear(); Review Comment:  Draining the queue using `poll()` in a loop is more idiomatic and robust than using `forEach` followed by `clear()`. It avoids relying on the weakly-consistent iterator of `ConcurrentLinkedQueue` and ensures elements are safely removed as they are processed, preventing any potential race conditions or memory retention issues if the queue is accessed concurrently. ```suggestion // The harness has finished the bundle (close() returned) so no further enqueues happen. WindowedValue<?> output; while ((output = pendingOutputs.poll()) != null) { ctx.forward( new Record<>( record.key(), KStreamsPayload.data(output), record.timestamp())); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
