apurtell commented on code in PR #2466:
URL: https://github.com/apache/phoenix/pull/2466#discussion_r3221520024
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager
createShardManager(String uri, String l
}
}
- /** create shard manager for the standby cluster */
- protected ReplicationShardDirectoryManager
createPeerShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+ /** create shard manager for the fallback cluster */
+ protected ReplicationShardDirectoryManager createLocalShardManager() throws
IOException {
+ return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
}
- /** create shard manager for the fallback cluster */
- protected ReplicationShardDirectoryManager
createLocalShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+ /**
+ * Get or create the peer shard manager. Thread-safe and idempotent — the
first successful
+ * creation is cached; subsequent calls return the cached instance. Bounded
by
+ * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the
disruptor handler
+ * thread on a peer NN outage.
+ */
+ protected ReplicationShardDirectoryManager getOrCreatePeerShardManager()
throws IOException {
+ ReplicationShardDirectoryManager cached = peerShardManager;
+ if (cached != null) {
+ return cached;
+ }
+ synchronized (this) {
+ if (peerShardManager != null) {
+ return peerShardManager;
+ }
+ CompletableFuture<ReplicationShardDirectoryManager> future =
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return createPeerShardManager();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ try {
+ peerShardManager = future.get(peerInitTimeoutMs,
TimeUnit.MILLISECONDS);
+ return peerShardManager;
+ } catch (UncheckedIOException e) {
Review Comment:
Nit. future.get() won't throw UncheckedIOException. When the supplier throws
unchecked, the future fails and Future.get() wraps the cause in
ExecutionException. You can unwrap the UncheckedIOException from
ExecutionException where you need it.
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager
createShardManager(String uri, String l
}
}
- /** create shard manager for the standby cluster */
- protected ReplicationShardDirectoryManager
createPeerShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+ /** create shard manager for the fallback cluster */
+ protected ReplicationShardDirectoryManager createLocalShardManager() throws
IOException {
+ return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
}
- /** create shard manager for the fallback cluster */
- protected ReplicationShardDirectoryManager
createLocalShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+ /**
+ * Get or create the peer shard manager. Thread-safe and idempotent — the
first successful
+ * creation is cached; subsequent calls return the cached instance. Bounded
by
+ * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the
disruptor handler
+ * thread on a peer NN outage.
+ */
+ protected ReplicationShardDirectoryManager getOrCreatePeerShardManager()
throws IOException {
+ ReplicationShardDirectoryManager cached = peerShardManager;
+ if (cached != null) {
+ return cached;
+ }
+ synchronized (this) {
+ if (peerShardManager != null) {
+ return peerShardManager;
+ }
+ CompletableFuture<ReplicationShardDirectoryManager> future =
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return createPeerShardManager();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ try {
+ peerShardManager = future.get(peerInitTimeoutMs,
TimeUnit.MILLISECONDS);
+ return peerShardManager;
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new IOException("Failed to create peer shard manager",
e.getCause());
+ } catch (TimeoutException e) {
+ future.cancel(true);
Review Comment:
future.cancel(true) does not actually free the worker thread. (Per
CompletableFuture Javadoc, mayInterruptIfRunning has no effect i.e. interrupts
are not used to control processing.) The stall is moved to the worker pool,
which is better, but the stall still holds resources the entire time the
underlying HDFS call is blocked. Every subsequent call to this method spawns
another supplyAsync task while the previous one is still hung, so this issue
can compound and potentially exhaust the worker pool.
I asked the robot about this and it recommended: _Use a dedicated
ExecutorService (single-thread, bounded queue) so future.cancel(true) can
actually interrupt, and so leakage is bounded._
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager
createShardManager(String uri, String l
}
}
- /** create shard manager for the standby cluster */
- protected ReplicationShardDirectoryManager
createPeerShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+ /** create shard manager for the fallback cluster */
+ protected ReplicationShardDirectoryManager createLocalShardManager() throws
IOException {
+ return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
}
- /** create shard manager for the fallback cluster */
- protected ReplicationShardDirectoryManager
createLocalShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+ /**
+ * Get or create the peer shard manager. Thread-safe and idempotent — the
first successful
+ * creation is cached; subsequent calls return the cached instance. Bounded
by
+ * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the
disruptor handler
+ * thread on a peer NN outage.
+ */
+ protected ReplicationShardDirectoryManager getOrCreatePeerShardManager()
throws IOException {
+ ReplicationShardDirectoryManager cached = peerShardManager;
+ if (cached != null) {
+ return cached;
+ }
+ synchronized (this) {
Review Comment:
With both the disruptor handler and the forwarder thread involved we get a
mutual exclusion here that may be longer than 10s.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]