kerneltime commented on code in PR #10470:
URL: https://github.com/apache/ozone/pull/10470#discussion_r3382906470
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -132,45 +133,183 @@ public void writeUnlock() {
*/
public void addSCMServer(InetSocketAddress address,
String threadNamePrefix) throws IOException {
+ addSCMServer(address, null, threadNamePrefix);
+ }
+
+ /**
+ * Adds a new SCM machine and remembers the original host:port string
+ * so the DN can re-resolve DNS on connection failure (e.g. after the
+ * SCM peer is rescheduled to a new pod IP in Kubernetes).
+ *
+ * @param address resolved RPC address used to build the proxy
+ * @param hostAndPort original "host:port" string, or null to disable
+ * DNS re-resolution for this endpoint
+ * @param threadNamePrefix prefix for the endpoint's task thread
+ */
+ public void addSCMServer(InetSocketAddress address, String hostAndPort,
+ String threadNamePrefix) throws IOException {
writeLock();
try {
if (scmMachines.containsKey(address)) {
LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
"Ignoring the request.");
return;
}
+ EndpointStateMachine endPoint =
+ buildScmEndpoint(address, hostAndPort, threadNamePrefix);
+ scmMachines.put(address, endPoint);
+ } finally {
+ writeUnlock();
+ }
+ }
- Configuration hadoopConfig =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
- RPC.setProtocolEngine(
- hadoopConfig,
- StorageContainerDatanodeProtocolPB.class,
- ProtobufRpcEngine.class);
- long version =
- RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+ /**
+ * Build (but do NOT register) a fresh active-SCM endpoint bound to
+ * {@code address}. The caller is responsible for registering it in
+ * {@code scmMachines} (and tearing down any previous endpoint it
+ * replaces). Factored out of {@link #addSCMServer} so
+ * {@link #refreshSCMServer} can construct the replacement BEFORE
+ * removing the stale entry, preserving the peer in {@code scmMachines}
+ * if proxy construction throws (transient DNS failure, peer not yet
+ * accepting on the new IP, etc.).
+ */
+ @VisibleForTesting
+ EndpointStateMachine buildScmEndpoint(InetSocketAddress address,
+ String hostAndPort, String threadNamePrefix) throws IOException {
+ Configuration hadoopConfig =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+ RPC.setProtocolEngine(
+ hadoopConfig,
+ StorageContainerDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ long version =
+ RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
- RetryPolicy retryPolicy =
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(
- getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
- TimeUnit.MILLISECONDS);
+ RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+ TimeUnit.MILLISECONDS);
- StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
- StorageContainerDatanodeProtocolPB.class, version,
- address, UserGroupInformation.getCurrentUser(), hadoopConfig,
- NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
- retryPolicy).getProxy();
+ StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+ StorageContainerDatanodeProtocolPB.class, version,
+ address, UserGroupInformation.getCurrentUser(), hadoopConfig,
+ NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
+ retryPolicy).getProxy();
- StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
- new StorageContainerDatanodeProtocolClientSideTranslatorPB(
- rpcProxy);
+ StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+ new StorageContainerDatanodeProtocolClientSideTranslatorPB(
+ rpcProxy);
- EndpointStateMachine endPoint = new EndpointStateMachine(address,
- rpcClient, this.conf, threadNamePrefix);
- endPoint.setPassive(false);
- scmMachines.put(address, endPoint);
+ EndpointStateMachine endPoint = new EndpointStateMachine(address,
+ hostAndPort, rpcClient, this.conf, threadNamePrefix);
+ endPoint.setPassive(false);
+ return endPoint;
+ }
+
+ /**
+ * Re-resolve the SCM hostname for the endpoint at {@code oldAddress} and,
+ * if the resolved IP has changed, atomically replace the endpoint with a
+ * fresh one bound to the new address.
+ * <p>
+ * Returns the new {@link InetSocketAddress} on a successful swap, or null
+ * if no swap occurred (endpoint not found, hostname not preserved at
+ * construction, IP unchanged, or DNS lookup failed). Callers receive
+ * enough information to update any external maps keyed by the old
+ * address.
+ * <p>
+ * The replacement is built via the existing {@link #addSCMServer} path,
+ * so the new endpoint starts in {@code GETVERSION} state and re-walks
+ * version → register → heartbeat. This is the correct behavior: a peer
+ * that has been rescheduled is effectively a fresh process.
+ */
+ public InetSocketAddress refreshSCMServer(InetSocketAddress oldAddress,
+ String threadNamePrefix) throws IOException {
+ final EndpointStateMachine staleEndpoint;
+ final InetSocketAddress refreshed;
+ final String hostAndPort;
+ writeLock();
+ try {
+ EndpointStateMachine existing = scmMachines.get(oldAddress);
+ if (existing == null) {
+ return null;
+ }
+ // Recon endpoints (added via addReconServer) speak a different
+ // protocol than active SCM endpoints. The current refresh path
+ // only knows how to rebuild SCM endpoints, so refusing to
+ // refresh a passive endpoint avoids silently downgrading a
+ // Recon endpoint to an SCM-protocol one. Recon's cached IP is
+ // also a much narrower problem in practice (Recon is rarely
+ // pod-rescheduled the way SCM-HA peers are).
+ if (existing.isPassive()) {
+ return null;
+ }
+ InetSocketAddress resolved = existing.resolveLatestAddress();
+ if (resolved == null) {
Review Comment:
Fixed in `745052d9`. Refactored `refreshSCMServer` into 4 phases: PHASE A
(read lock) snapshots the endpoint reference + hostAndPort; PHASE B (no lock)
does the DNS lookup via `resolveLatestAddress()`; PHASE C (write lock)
re-checks `current == snapshotEndpoint` to defend against concurrent
removeSCMServer/refresh races, enforces the collision invariant, builds the
replacement, commits the swap; PHASE D (no lock) closes the stale endpoint.
Mirrors the SCMFailoverProxyProviderBase pattern. Logged as LENS-012 + META-3
in my MISSED_LENSES.md: this was the same lens (no I/O under lock) that you
flagged for `existing.close()` in R4 — I fixed the named site but did not sweep
siblings in the same method. The new rule is "when fixing one site flagged by a
lens, sweep every site in the same method before declaring the fix complete."
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
Review Comment:
Fixed in `745052d9`. Moved `com.google.common.annotations.VisibleForTesting`
to immediately after the static imports and before `java.*`, matching Ozone's
CustomImportOrder convention.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestSCMConnectionManager.java:
##########
@@ -46,4 +48,178 @@ public void testRemoveSCMServerDoesNotMarkEndpointShutdown()
Assertions.assertEquals(HEARTBEAT, endpoint.getState());
}
}
+
+ /**
+ * resolveLatestAddress() returns null when no preserved hostname is
+ * available -- legacy code path -- so re-resolution is a no-op for that
+ * endpoint. Operator must restart the DN to pick up a new IP.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNullWithoutHostAndPort() {
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ address, /*hostAndPort=*/ null, /*endPoint=*/ null,
+ new OzoneConfiguration(), "");
+ Assertions.assertNull(endpoint.resolveLatestAddress());
+ Assertions.assertNull(endpoint.getHostAndPort());
+ }
+
+ /**
+ * When the cached IP matches what DNS currently returns for the
+ * preserved hostname, resolveLatestAddress() returns null (no swap
+ * needed). Uses "localhost" because it reliably resolves to a loopback
+ * address in any test environment.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNullWhenIpUnchanged()
+ throws Exception {
+ InetAddress loopback = InetAddress.getByName("localhost");
+ InetSocketAddress address = new InetSocketAddress(loopback, 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ address, "localhost:9861", null, new OzoneConfiguration(), "");
+ InetSocketAddress refreshed = endpoint.resolveLatestAddress();
+ Assertions.assertNull(refreshed,
+ "localhost re-resolves to the same loopback address; refresh "
+ + "must report no change so the endpoint is not torn down "
+ + "needlessly.");
+ }
+
+ /**
+ * When the cached IP differs from what DNS currently returns for the
+ * preserved hostname, resolveLatestAddress() returns the freshly-
+ * resolved address. Simulates the "SCM pod was rescheduled to a new
+ * IP" scenario by constructing the endpoint with a deliberately stale
+ * cached IP.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNewAddressOnIpChange()
+ throws Exception {
+ // Pretend localhost previously resolved to 127.0.0.99 (stale IP).
+ // In real Kubernetes this would be the now-defunct pod IP.
+ InetSocketAddress staleAddress = new InetSocketAddress(
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 99}), 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ staleAddress, "localhost:9861", null,
+ new OzoneConfiguration(), "");
+ InetSocketAddress refreshed = endpoint.resolveLatestAddress();
+ Assertions.assertNotNull(refreshed,
+ "localhost re-resolves to loopback (typically 127.0.0.1), "
+ + "which differs from the stale 127.0.0.99 we cached; "
+ + "refresh must report the change so the endpoint can "
+ + "swap to the live address.");
+ Assertions.assertEquals(9861, refreshed.getPort());
+ Assertions.assertNotEquals(staleAddress.getAddress(),
+ refreshed.getAddress());
+ }
+
+ /**
+ * refreshSCMServer() swaps an endpoint's address atomically in the
+ * connection manager when the cached IP is stale. The replacement
+ * endpoint starts in GETVERSION state -- the version handshake must
+ * be re-run because the new SCM pod is effectively a fresh process.
+ */
+ @Test
+ public void testRefreshSCMServerSwapsEndpointOnIpChange() throws Exception {
+ try (SCMConnectionManager connectionManager =
+ new SCMConnectionManager(new OzoneConfiguration())) {
+ InetSocketAddress staleAddress = new InetSocketAddress(
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 99}), 9861);
+ connectionManager.addSCMServer(staleAddress, "localhost:9861", "");
+
+ InetSocketAddress refreshed = connectionManager.refreshSCMServer(
+ staleAddress, "");
+
+ Assertions.assertNotNull(refreshed);
+ Assertions.assertEquals(1, connectionManager.getNumOfConnections());
+ EndpointStateMachine swapped =
+ connectionManager.getValues().iterator().next();
+ Assertions.assertEquals(refreshed, swapped.getAddress());
+ Assertions.assertEquals("localhost:9861", swapped.getHostAndPort());
+ }
+ }
+
+ /**
Review Comment:
Fixed in `745052d9`. The orphan was a Javadoc for
`testRefreshSCMServerNoopWhenIpUnchanged` that I left stranded above the new R3
rollback test when I inserted the new test in front. Moved the Javadoc to
immediately above its matching `@Test` method.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -984,6 +1217,7 @@ synchronized boolean putIfAbsent(PipelineKey key,
return map.putIfAbsent(key, pipelineAction) == null;
}
+
synchronized List<PipelineAction> getActions(List<PipelineReport> reports,
Review Comment:
Fixed in `745052d9`. Removed the second blank line between `putIfAbsent` and
`getActions` in `PipelineActionMap` — left over from the `drainInto` deletion
in R3.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]