apurtell commented on code in PR #2466:
URL: https://github.com/apache/phoenix/pull/2466#discussion_r3221516515
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager
createShardManager(String uri, String l
}
}
- /** create shard manager for the standby cluster */
- protected ReplicationShardDirectoryManager
createPeerShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+ /** create shard manager for the fallback cluster */
+ protected ReplicationShardDirectoryManager createLocalShardManager() throws
IOException {
+ return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
}
- /** create shard manager for the fallback cluster */
- protected ReplicationShardDirectoryManager
createLocalShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+ /**
+ * Get or create the peer shard manager. Thread-safe and idempotent — the
first successful
+ * creation is cached; subsequent calls return the cached instance. Bounded
by
+ * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the
disruptor handler
+ * thread on a peer NN outage.
+ */
+ protected ReplicationShardDirectoryManager getOrCreatePeerShardManager()
throws IOException {
+ ReplicationShardDirectoryManager cached = peerShardManager;
+ if (cached != null) {
+ return cached;
+ }
+ synchronized (this) {
+ if (peerShardManager != null) {
+ return peerShardManager;
+ }
+ CompletableFuture<ReplicationShardDirectoryManager> future =
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return createPeerShardManager();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ try {
+ peerShardManager = future.get(peerInitTimeoutMs,
TimeUnit.MILLISECONDS);
+ return peerShardManager;
+ } catch (UncheckedIOException e) {
Review Comment:
future.get() won't throw UncheckedIOException. When the supplier throws
unchecked, the future fails and Future.get() wraps the cause in
ExecutionException.
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager
createShardManager(String uri, String l
}
}
- /** create shard manager for the standby cluster */
- protected ReplicationShardDirectoryManager
createPeerShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+ /** create shard manager for the fallback cluster */
+ protected ReplicationShardDirectoryManager createLocalShardManager() throws
IOException {
+ return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
}
- /** create shard manager for the fallback cluster */
- protected ReplicationShardDirectoryManager
createLocalShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+ /**
+ * Get or create the peer shard manager. Thread-safe and idempotent — the
first successful
+ * creation is cached; subsequent calls return the cached instance. Bounded
by
+ * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the
disruptor handler
+ * thread on a peer NN outage.
+ */
+ protected ReplicationShardDirectoryManager getOrCreatePeerShardManager()
throws IOException {
+ ReplicationShardDirectoryManager cached = peerShardManager;
+ if (cached != null) {
+ return cached;
+ }
+ synchronized (this) {
+ if (peerShardManager != null) {
+ return peerShardManager;
+ }
+ CompletableFuture<ReplicationShardDirectoryManager> future =
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return createPeerShardManager();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ try {
+ peerShardManager = future.get(peerInitTimeoutMs,
TimeUnit.MILLISECONDS);
+ return peerShardManager;
+ } catch (UncheckedIOException e) {
Review Comment:
future.get() won't throw UncheckedIOException. When the supplier throws
unchecked, the future fails and Future.get() wraps the cause in
ExecutionException.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]