sjvanrossum commented on code in PR #32962:
URL: https://github.com/apache/beam/pull/32962#discussion_r1854222190
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -46,48 +55,92 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private final UnboundedSolaceSource<T> currentSource;
private final WatermarkPolicy<T> watermarkPolicy;
private final SempClient sempClient;
+ private final UUID readerUuid;
+ private final SessionServiceFactory sessionServiceFactory;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
- private @Nullable SessionService sessionService;
- AtomicBoolean active = new AtomicBoolean(true);
/**
- * Queue to place advanced messages before {@link #getCheckpointMark()} be
called non-concurrent
- * queue, should only be accessed by the reader thread A given {@link
UnboundedReader} object will
- * only be accessed by a single thread at once.
+ * Queue to place advanced messages before {@link #getCheckpointMark()} is
called. CAUTION:
+ * Accessed by both reader and checkpointing threads.
*/
- private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new
ArrayDeque<>();
+ private final Queue<BytesXMLMessage> safeToAckMessages = new
ConcurrentLinkedQueue<>();
+
+ /**
+ * Queue for messages that were ingested in the {@link #advance()} method,
but not sent yet to a
+ * {@link SolaceCheckpointMark}.
+ */
+ private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>();
+
+ private static final Cache<UUID, SessionService> sessionServiceCache;
+ private static final ScheduledExecutorService cleanUpThread =
Executors.newScheduledThreadPool(1);
+
+ static {
+ Duration cacheExpirationTimeout = Duration.ofMinutes(1);
+ sessionServiceCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheExpirationTimeout)
+ .removalListener(
+ (RemovalNotification<UUID, SessionService> notification) -> {
+ LOG.info(
+ "SolaceIO.Read: Closing session for the reader with uuid
{} as it has been idle for over {}.",
+ notification.getKey(),
+ cacheExpirationTimeout);
+ SessionService sessionService = notification.getValue();
+ if (sessionService != null) {
+ sessionService.close();
+ }
+ })
+ .build();
+
+ startCleanUpThread();
+ }
Review Comment:
Was this cache necessary still now that messages acks are attempted whenever
possible? Binding the lifetime of the session to a reader should be fine since
a reader's lifetime is generally quite long.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -46,48 +55,92 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private final UnboundedSolaceSource<T> currentSource;
private final WatermarkPolicy<T> watermarkPolicy;
private final SempClient sempClient;
+ private final UUID readerUuid;
+ private final SessionServiceFactory sessionServiceFactory;
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
- private @Nullable SessionService sessionService;
- AtomicBoolean active = new AtomicBoolean(true);
/**
- * Queue to place advanced messages before {@link #getCheckpointMark()} be
called non-concurrent
- * queue, should only be accessed by the reader thread A given {@link
UnboundedReader} object will
- * only be accessed by a single thread at once.
+ * Queue to place advanced messages before {@link #getCheckpointMark()} is
called. CAUTION:
+ * Accessed by both reader and checkpointing threads.
*/
- private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new
ArrayDeque<>();
+ private final Queue<BytesXMLMessage> safeToAckMessages = new
ConcurrentLinkedQueue<>();
+
+ /**
+ * Queue for messages that were ingested in the {@link #advance()} method,
but not sent yet to a
+ * {@link SolaceCheckpointMark}.
+ */
+ private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>();
+
+ private static final Cache<UUID, SessionService> sessionServiceCache;
+ private static final ScheduledExecutorService cleanUpThread =
Executors.newScheduledThreadPool(1);
+
+ static {
+ Duration cacheExpirationTimeout = Duration.ofMinutes(1);
+ sessionServiceCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheExpirationTimeout)
+ .removalListener(
+ (RemovalNotification<UUID, SessionService> notification) -> {
+ LOG.info(
+ "SolaceIO.Read: Closing session for the reader with uuid
{} as it has been idle for over {}.",
+ notification.getKey(),
+ cacheExpirationTimeout);
+ SessionService sessionService = notification.getValue();
+ if (sessionService != null) {
+ sessionService.close();
+ }
+ })
+ .build();
+
+ startCleanUpThread();
+ }
Review Comment:
Was this cache necessary still now that message acks are attempted whenever
possible? Binding the lifetime of the session to a reader should be fine since
a reader's lifetime is generally quite long.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]