apurtell commented on code in PR #2466:
URL: https://github.com/apache/phoenix/pull/2466#discussion_r3221516515


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager 
createShardManager(String uri, String l
     }
   }
 
-  /** create shard manager for the standby cluster */
-  protected ReplicationShardDirectoryManager 
createPeerShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+  /** create shard manager for the fallback cluster */
+  protected ReplicationShardDirectoryManager createLocalShardManager() throws 
IOException {
+    return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
   }
 
-  /** create shard manager for the fallback cluster */
-  protected ReplicationShardDirectoryManager 
createLocalShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+  /**
+   * Get or create the peer shard manager. Thread-safe and idempotent — the 
first successful
+   * creation is cached; subsequent calls return the cached instance. Bounded 
by
+   * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the 
disruptor handler
+   * thread on a peer NN outage.
+   */
+  protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() 
throws IOException {
+    ReplicationShardDirectoryManager cached = peerShardManager;
+    if (cached != null) {
+      return cached;
+    }
+    synchronized (this) {
+      if (peerShardManager != null) {
+        return peerShardManager;
+      }
+      CompletableFuture<ReplicationShardDirectoryManager> future =
+        CompletableFuture.supplyAsync(() -> {
+          try {
+            return createPeerShardManager();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+      try {
+        peerShardManager = future.get(peerInitTimeoutMs, 
TimeUnit.MILLISECONDS);
+        return peerShardManager;
+      } catch (UncheckedIOException e) {

Review Comment:
   future.get() won't throw UncheckedIOException. When the supplier throws 
unchecked, the future fails and Future.get() wraps the cause in 
ExecutionException.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager 
createShardManager(String uri, String l
     }
   }
 
-  /** create shard manager for the standby cluster */
-  protected ReplicationShardDirectoryManager 
createPeerShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+  /** create shard manager for the fallback cluster */
+  protected ReplicationShardDirectoryManager createLocalShardManager() throws 
IOException {
+    return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
   }
 
-  /** create shard manager for the fallback cluster */
-  protected ReplicationShardDirectoryManager 
createLocalShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+  /**
+   * Get or create the peer shard manager. Thread-safe and idempotent — the 
first successful
+   * creation is cached; subsequent calls return the cached instance. Bounded 
by
+   * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the 
disruptor handler
+   * thread on a peer NN outage.
+   */
+  protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() 
throws IOException {
+    ReplicationShardDirectoryManager cached = peerShardManager;
+    if (cached != null) {
+      return cached;
+    }
+    synchronized (this) {
+      if (peerShardManager != null) {
+        return peerShardManager;
+      }
+      CompletableFuture<ReplicationShardDirectoryManager> future =
+        CompletableFuture.supplyAsync(() -> {
+          try {
+            return createPeerShardManager();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+      try {
+        peerShardManager = future.get(peerInitTimeoutMs, 
TimeUnit.MILLISECONDS);
+        return peerShardManager;
+      } catch (UncheckedIOException e) {

Review Comment:
   future.get() won't throw UncheckedIOException. When the supplier throws 
unchecked, the future fails and Future.get() wraps the cause in 
ExecutionException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to