bzablocki commented on code in PR #32962:
URL: https://github.com/apache/beam/pull/32962#discussion_r1856711961


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -46,48 +55,92 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
   private final UnboundedSolaceSource<T> currentSource;
   private final WatermarkPolicy<T> watermarkPolicy;
   private final SempClient sempClient;
+  private final UUID readerUuid;
+  private final SessionServiceFactory sessionServiceFactory;
   private @Nullable BytesXMLMessage solaceOriginalRecord;
   private @Nullable T solaceMappedRecord;
-  private @Nullable SessionService sessionService;
-  AtomicBoolean active = new AtomicBoolean(true);
 
   /**
-   * Queue to place advanced messages before {@link #getCheckpointMark()} be 
called non-concurrent
-   * queue, should only be accessed by the reader thread A given {@link 
UnboundedReader} object will
-   * only be accessed by a single thread at once.
+   * Queue to place advanced messages before {@link #getCheckpointMark()} is 
called. CAUTION:
+   * Accessed by both reader and checkpointing threads.
    */
-  private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new 
ArrayDeque<>();
+  private final Queue<BytesXMLMessage> safeToAckMessages = new 
ConcurrentLinkedQueue<>();
+
+  /**
+   * Queue for messages that were ingested in the {@link #advance()} method, 
but not sent yet to a
+   * {@link SolaceCheckpointMark}.
+   */
+  private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>();
+
+  private static final Cache<UUID, SessionService> sessionServiceCache;
+  private static final ScheduledExecutorService cleanUpThread = 
Executors.newScheduledThreadPool(1);
+
+  static {
+    Duration cacheExpirationTimeout = Duration.ofMinutes(1);
+    sessionServiceCache =
+        CacheBuilder.newBuilder()
+            .expireAfterAccess(cacheExpirationTimeout)
+            .removalListener(
+                (RemovalNotification<UUID, SessionService> notification) -> {
+                  LOG.info(
+                      "SolaceIO.Read: Closing session for the reader with uuid 
{} as it has been idle for over {}.",
+                      notification.getKey(),
+                      cacheExpirationTimeout);
+                  SessionService sessionService = notification.getValue();
+                  if (sessionService != null) {
+                    sessionService.close();
+                  }
+                })
+            .build();
+
+    startCleanUpThread();
+  }

Review Comment:
   The problem I have is that I can't reproduce the issue @ppawel was 
reporting, so I thought I would try this approach in conjunction with acking 
messages whenever possible.
   Also, without this the session was only closed in the  
`UnboundedSolaceReader#close` method, which was never called in my experiments 
- It was not called when I drained or updated a pipeline, or when returned 
`false` in the `advance()` method. That's why I implemented this caching with 
removal listener to more eagerly close any open sessions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to