gemini-code-assist[bot] commented on code in PR #38603: URL: https://github.com/apache/beam/pull/38603#discussion_r3289490421
########## sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import java.lang.ref.WeakReference; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A global registry to keep track of active {@link UnboundedSolaceReader} instances on the worker + * JVM using weak references. + * + * <p>This allows serialized {@link SolaceCheckpointMark} instances to resolve their originating + * reader and perform sequential acknowledgments. + */ +class ActiveReadersRegistry { + private static final ConcurrentHashMap<UUID, WeakReference<UnboundedSolaceReader<?>>> registry = + new ConcurrentHashMap<>(); Review Comment:  Using a `ConcurrentHashMap` with `WeakReference` values can lead to a slow memory leak of `UUID` keys if `unregister` is not called (e.g., if a reader is abandoned or crashes during initialization). Consider using Guava's `Cache` with `weakValues()`, which provides better automatic cleanup of keys when the values are garbage collected. ```java private static final org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache<UUID, UnboundedSolaceReader<?>> registry = org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder.newBuilder().weakValues().build(); public static void register(UUID uuid, UnboundedSolaceReader<?> reader) { registry.put(uuid, reader); } public static void unregister(UUID uuid) { registry.invalidate(uuid); } public static @Nullable UnboundedSolaceReader<?> get(UUID uuid) { return registry.getIfPresent(uuid); } ``` ########## sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java: ########## @@ -39,33 +37,42 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient Queue<BytesXMLMessage> safeToAck; + private String readerUuid; + private long checkpointId; Review Comment:  Since `UUID` is `Serializable` and supported by many coders, storing it as a `String` and repeatedly calling `UUID.fromString()` during finalization adds unnecessary overhead. It's more efficient to store the `UUID` object directly. ```suggestion private UUID readerUuid; private long checkpointId; ``` ########## sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java: ########## @@ -151,30 +155,35 @@ public boolean advance() { } solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - receivedMessages.add(receivedXmlMessage); + synchronized (this) { + receivedMessages.add(receivedXmlMessage); + } Review Comment:  Synchronizing on `this` is generally discouraged for internal state management as it exposes the lock to external callers, which could lead to unexpected deadlocks. It is safer to use a private lock object. ```suggestion synchronized (lock) { receivedMessages.add(receivedXmlMessage); } ``` ########## sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java: ########## @@ -151,30 +155,35 @@ public boolean advance() { } solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - receivedMessages.add(receivedXmlMessage); + synchronized (this) { + receivedMessages.add(receivedXmlMessage); + } return true; } @Override public void close() { - finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); + ActiveReadersRegistry.unregister(readerUuid); } - public void finalizeReadyMessages() { - BytesXMLMessage msg; - while ((msg = safeToAckMessages.poll()) != null) { + public void finalizeCheckpoint(long checkpointId) { Review Comment:  This method is only intended to be called by `SolaceCheckpointMark` within the same package. Reducing its visibility to package-private follows the principle of least privilege. ```suggestion void finalizeCheckpoint(long checkpointId) { ``` ########## sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java: ########## @@ -151,30 +155,35 @@ public boolean advance() { } solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - receivedMessages.add(receivedXmlMessage); + synchronized (this) { + receivedMessages.add(receivedXmlMessage); + } return true; } @Override public void close() { - finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); + ActiveReadersRegistry.unregister(readerUuid); } - public void finalizeReadyMessages() { - BytesXMLMessage msg; - while ((msg = safeToAckMessages.poll()) != null) { + public void finalizeCheckpoint(long checkpointId) { + List<BytesXMLMessage> messagesToAck = new ArrayList<>(); + + synchronized (this) { + SortedMap<Long, List<BytesXMLMessage>> toAck = pendingCheckpoints.headMap(checkpointId, true); + for (List<BytesXMLMessage> msgs : toAck.values()) { + messagesToAck.addAll(msgs); + } + toAck.clear(); + } + + for (BytesXMLMessage msg : messagesToAck) { try { msg.ackMessage(); } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - safeToAckMessages.add(msg); // In case the error was transient, might succeed later - break; // Commit is only best effort + LOG.warn("SolaceIO.Read: Failed to ack message, session might be closed.", e); } Review Comment:  The previous implementation logged the `applicationMessageId` and `ackMessageId` when an acknowledgment failed. This information is crucial for debugging data flow issues and identifying specific problematic messages. Please restore these details in the log message. ```java } catch (IllegalStateException e) { LOG.warn( "SolaceIO.Read: Failed to acknowledge message with applicationMessageId={}, ackMessageId={}. Session might be closed.", msg.getApplicationMessageId(), msg.getAckMessageId(), e); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
