apurtell commented on code in PR #2466:
URL: https://github.com/apache/phoenix/pull/2466#discussion_r3221520024


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager 
createShardManager(String uri, String l
     }
   }
 
-  /** create shard manager for the standby cluster */
-  protected ReplicationShardDirectoryManager 
createPeerShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+  /** create shard manager for the fallback cluster */
+  protected ReplicationShardDirectoryManager createLocalShardManager() throws 
IOException {
+    return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
   }
 
-  /** create shard manager for the fallback cluster */
-  protected ReplicationShardDirectoryManager 
createLocalShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+  /**
+   * Get or create the peer shard manager. Thread-safe and idempotent — the 
first successful
+   * creation is cached; subsequent calls return the cached instance. Bounded 
by
+   * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the 
disruptor handler
+   * thread on a peer NN outage.
+   */
+  protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() 
throws IOException {
+    ReplicationShardDirectoryManager cached = peerShardManager;
+    if (cached != null) {
+      return cached;
+    }
+    synchronized (this) {
+      if (peerShardManager != null) {
+        return peerShardManager;
+      }
+      CompletableFuture<ReplicationShardDirectoryManager> future =
+        CompletableFuture.supplyAsync(() -> {
+          try {
+            return createPeerShardManager();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+      try {
+        peerShardManager = future.get(peerInitTimeoutMs, 
TimeUnit.MILLISECONDS);
+        return peerShardManager;
+      } catch (UncheckedIOException e) {

Review Comment:
   Nit. future.get() won't throw UncheckedIOException. When the supplier throws 
unchecked, the future fails and Future.get() wraps the cause in 
ExecutionException. You can unwrap the UncheckedIOException from 
ExecutionException where you need it. 



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager 
createShardManager(String uri, String l
     }
   }
 
-  /** create shard manager for the standby cluster */
-  protected ReplicationShardDirectoryManager 
createPeerShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+  /** create shard manager for the fallback cluster */
+  protected ReplicationShardDirectoryManager createLocalShardManager() throws 
IOException {
+    return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
   }
 
-  /** create shard manager for the fallback cluster */
-  protected ReplicationShardDirectoryManager 
createLocalShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+  /**
+   * Get or create the peer shard manager. Thread-safe and idempotent — the 
first successful
+   * creation is cached; subsequent calls return the cached instance. Bounded 
by
+   * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the 
disruptor handler
+   * thread on a peer NN outage.
+   */
+  protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() 
throws IOException {
+    ReplicationShardDirectoryManager cached = peerShardManager;
+    if (cached != null) {
+      return cached;
+    }
+    synchronized (this) {
+      if (peerShardManager != null) {
+        return peerShardManager;
+      }
+      CompletableFuture<ReplicationShardDirectoryManager> future =
+        CompletableFuture.supplyAsync(() -> {
+          try {
+            return createPeerShardManager();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+      try {
+        peerShardManager = future.get(peerInitTimeoutMs, 
TimeUnit.MILLISECONDS);
+        return peerShardManager;
+      } catch (UncheckedIOException e) {
+        throw e.getCause();
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        }
+        throw new IOException("Failed to create peer shard manager", 
e.getCause());
+      } catch (TimeoutException e) {
+        future.cancel(true);

Review Comment:
   future.cancel(true) does not actually free the worker thread. (Per 
CompletableFuture Javadoc, mayInterruptIfRunning has no effect i.e. interrupts 
are not used to control processing.) The stall is moved to the worker pool, 
which is better, but the stall still holds resources the entire time the 
underlying HDFS call is blocked. Every subsequent call to this method spawns 
another supplyAsync task while the previous one is still hung, so this issue 
can compound and potentially exhaust the worker pool. 
   
   I asked the robot about this and it recommended: _Use a dedicated 
ExecutorService (single-thread, bounded queue) so future.cancel(true) can 
actually interrupt, and so leakage is bounded._



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager 
createShardManager(String uri, String l
     }
   }
 
-  /** create shard manager for the standby cluster */
-  protected ReplicationShardDirectoryManager 
createPeerShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+  /** create shard manager for the fallback cluster */
+  protected ReplicationShardDirectoryManager createLocalShardManager() throws 
IOException {
+    return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
   }
 
-  /** create shard manager for the fallback cluster */
-  protected ReplicationShardDirectoryManager 
createLocalShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+  /**
+   * Get or create the peer shard manager. Thread-safe and idempotent — the 
first successful
+   * creation is cached; subsequent calls return the cached instance. Bounded 
by
+   * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the 
disruptor handler
+   * thread on a peer NN outage.
+   */
+  protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() 
throws IOException {
+    ReplicationShardDirectoryManager cached = peerShardManager;
+    if (cached != null) {
+      return cached;
+    }
+    synchronized (this) {

Review Comment:
   With both the disruptor handler and the forwarder thread involved we get a 
mutual exclusion here that may be longer than 10s. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to