Copilot commented on code in PR #10470:
URL: https://github.com/apache/ozone/pull/10470#discussion_r3382727776
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
Review Comment:
Import order will violate the repo’s CustomImportOrder rule (non-static
imports must be alphabetized within the group).
`com.google...VisibleForTesting` should sort before `java.*` imports; otherwise
checkstyle will fail.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -984,6 +1217,7 @@ synchronized boolean putIfAbsent(PipelineKey key,
return map.putIfAbsent(key, pipelineAction) == null;
}
+
synchronized List<PipelineAction> getActions(List<PipelineReport> reports,
Review Comment:
There are two consecutive blank lines before `getActions(...)`. Checkstyle’s
`EmptyLineSeparator` disallows multiple empty lines, so this will fail
formatting checks.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestSCMConnectionManager.java:
##########
@@ -46,4 +48,178 @@ public void testRemoveSCMServerDoesNotMarkEndpointShutdown()
Assertions.assertEquals(HEARTBEAT, endpoint.getState());
}
}
+
+ /**
+ * resolveLatestAddress() returns null when no preserved hostname is
+ * available -- legacy code path -- so re-resolution is a no-op for that
+ * endpoint. Operator must restart the DN to pick up a new IP.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNullWithoutHostAndPort() {
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ address, /*hostAndPort=*/ null, /*endPoint=*/ null,
+ new OzoneConfiguration(), "");
+ Assertions.assertNull(endpoint.resolveLatestAddress());
+ Assertions.assertNull(endpoint.getHostAndPort());
+ }
+
+ /**
+ * When the cached IP matches what DNS currently returns for the
+ * preserved hostname, resolveLatestAddress() returns null (no swap
+ * needed). Uses "localhost" because it reliably resolves to a loopback
+ * address in any test environment.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNullWhenIpUnchanged()
+ throws Exception {
+ InetAddress loopback = InetAddress.getByName("localhost");
+ InetSocketAddress address = new InetSocketAddress(loopback, 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ address, "localhost:9861", null, new OzoneConfiguration(), "");
+ InetSocketAddress refreshed = endpoint.resolveLatestAddress();
+ Assertions.assertNull(refreshed,
+ "localhost re-resolves to the same loopback address; refresh "
+ + "must report no change so the endpoint is not torn down "
+ + "needlessly.");
+ }
+
+ /**
+ * When the cached IP differs from what DNS currently returns for the
+ * preserved hostname, resolveLatestAddress() returns the freshly-
+ * resolved address. Simulates the "SCM pod was rescheduled to a new
+ * IP" scenario by constructing the endpoint with a deliberately stale
+ * cached IP.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNewAddressOnIpChange()
+ throws Exception {
+ // Pretend localhost previously resolved to 127.0.0.99 (stale IP).
+ // In real Kubernetes this would be the now-defunct pod IP.
+ InetSocketAddress staleAddress = new InetSocketAddress(
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 99}), 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ staleAddress, "localhost:9861", null,
+ new OzoneConfiguration(), "");
+ InetSocketAddress refreshed = endpoint.resolveLatestAddress();
+ Assertions.assertNotNull(refreshed,
+ "localhost re-resolves to loopback (typically 127.0.0.1), "
+ + "which differs from the stale 127.0.0.99 we cached; "
+ + "refresh must report the change so the endpoint can "
+ + "swap to the live address.");
+ Assertions.assertEquals(9861, refreshed.getPort());
+ Assertions.assertNotEquals(staleAddress.getAddress(),
+ refreshed.getAddress());
+ }
+
+ /**
+ * refreshSCMServer() swaps an endpoint's address atomically in the
+ * connection manager when the cached IP is stale. The replacement
+ * endpoint starts in GETVERSION state -- the version handshake must
+ * be re-run because the new SCM pod is effectively a fresh process.
+ */
+ @Test
+ public void testRefreshSCMServerSwapsEndpointOnIpChange() throws Exception {
+ try (SCMConnectionManager connectionManager =
+ new SCMConnectionManager(new OzoneConfiguration())) {
+ InetSocketAddress staleAddress = new InetSocketAddress(
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 99}), 9861);
+ connectionManager.addSCMServer(staleAddress, "localhost:9861", "");
+
+ InetSocketAddress refreshed = connectionManager.refreshSCMServer(
+ staleAddress, "");
+
+ Assertions.assertNotNull(refreshed);
+ Assertions.assertEquals(1, connectionManager.getNumOfConnections());
+ EndpointStateMachine swapped =
+ connectionManager.getValues().iterator().next();
+ Assertions.assertEquals(refreshed, swapped.getAddress());
+ Assertions.assertEquals("localhost:9861", swapped.getHostAndPort());
+ }
+ }
+
+ /**
+ * refreshSCMServer() against an endpoint whose cached IP already matches
+ * DNS is a no-op -- the existing endpoint stays in place untouched. This
+ * prevents needless tearing-down of healthy connections when the
+ * heartbeat task asks to refresh after a transient blip.
+ */
Review Comment:
This Javadoc block is orphaned/misplaced (it’s immediately followed by
another Javadoc and does not document any member). It’s misleading because it
appears to describe the next test but the next test is a different regression
case.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -132,45 +133,183 @@ public void writeUnlock() {
*/
public void addSCMServer(InetSocketAddress address,
String threadNamePrefix) throws IOException {
+ addSCMServer(address, null, threadNamePrefix);
+ }
+
+ /**
+ * Adds a new SCM machine and remembers the original host:port string
+ * so the DN can re-resolve DNS on connection failure (e.g. after the
+ * SCM peer is rescheduled to a new pod IP in Kubernetes).
+ *
+ * @param address resolved RPC address used to build the proxy
+ * @param hostAndPort original "host:port" string, or null to disable
+ * DNS re-resolution for this endpoint
+ * @param threadNamePrefix prefix for the endpoint's task thread
+ */
+ public void addSCMServer(InetSocketAddress address, String hostAndPort,
+ String threadNamePrefix) throws IOException {
writeLock();
try {
if (scmMachines.containsKey(address)) {
LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
"Ignoring the request.");
return;
}
+ EndpointStateMachine endPoint =
+ buildScmEndpoint(address, hostAndPort, threadNamePrefix);
+ scmMachines.put(address, endPoint);
+ } finally {
+ writeUnlock();
+ }
+ }
- Configuration hadoopConfig =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
- RPC.setProtocolEngine(
- hadoopConfig,
- StorageContainerDatanodeProtocolPB.class,
- ProtobufRpcEngine.class);
- long version =
- RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+ /**
+ * Build (but do NOT register) a fresh active-SCM endpoint bound to
+ * {@code address}. The caller is responsible for registering it in
+ * {@code scmMachines} (and tearing down any previous endpoint it
+ * replaces). Factored out of {@link #addSCMServer} so
+ * {@link #refreshSCMServer} can construct the replacement BEFORE
+ * removing the stale entry, preserving the peer in {@code scmMachines}
+ * if proxy construction throws (transient DNS failure, peer not yet
+ * accepting on the new IP, etc.).
+ */
+ @VisibleForTesting
+ EndpointStateMachine buildScmEndpoint(InetSocketAddress address,
+ String hostAndPort, String threadNamePrefix) throws IOException {
+ Configuration hadoopConfig =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+ RPC.setProtocolEngine(
+ hadoopConfig,
+ StorageContainerDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ long version =
+ RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
- RetryPolicy retryPolicy =
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(
- getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
- TimeUnit.MILLISECONDS);
+ RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+ TimeUnit.MILLISECONDS);
- StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
- StorageContainerDatanodeProtocolPB.class, version,
- address, UserGroupInformation.getCurrentUser(), hadoopConfig,
- NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
- retryPolicy).getProxy();
+ StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+ StorageContainerDatanodeProtocolPB.class, version,
+ address, UserGroupInformation.getCurrentUser(), hadoopConfig,
+ NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
+ retryPolicy).getProxy();
- StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
- new StorageContainerDatanodeProtocolClientSideTranslatorPB(
- rpcProxy);
+ StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+ new StorageContainerDatanodeProtocolClientSideTranslatorPB(
+ rpcProxy);
- EndpointStateMachine endPoint = new EndpointStateMachine(address,
- rpcClient, this.conf, threadNamePrefix);
- endPoint.setPassive(false);
- scmMachines.put(address, endPoint);
+ EndpointStateMachine endPoint = new EndpointStateMachine(address,
+ hostAndPort, rpcClient, this.conf, threadNamePrefix);
+ endPoint.setPassive(false);
+ return endPoint;
+ }
+
+ /**
+ * Re-resolve the SCM hostname for the endpoint at {@code oldAddress} and,
+ * if the resolved IP has changed, atomically replace the endpoint with a
+ * fresh one bound to the new address.
+ * <p>
+ * Returns the new {@link InetSocketAddress} on a successful swap, or null
+ * if no swap occurred (endpoint not found, hostname not preserved at
+ * construction, IP unchanged, or DNS lookup failed). Callers receive
+ * enough information to update any external maps keyed by the old
+ * address.
+ * <p>
+ * The replacement is built via the existing {@link #addSCMServer} path,
+ * so the new endpoint starts in {@code GETVERSION} state and re-walks
+ * version → register → heartbeat. This is the correct behavior: a peer
+ * that has been rescheduled is effectively a fresh process.
+ */
+ public InetSocketAddress refreshSCMServer(InetSocketAddress oldAddress,
+ String threadNamePrefix) throws IOException {
+ final EndpointStateMachine staleEndpoint;
+ final InetSocketAddress refreshed;
+ final String hostAndPort;
+ writeLock();
+ try {
+ EndpointStateMachine existing = scmMachines.get(oldAddress);
+ if (existing == null) {
+ return null;
+ }
+ // Recon endpoints (added via addReconServer) speak a different
+ // protocol than active SCM endpoints. The current refresh path
+ // only knows how to rebuild SCM endpoints, so refusing to
+ // refresh a passive endpoint avoids silently downgrading a
+ // Recon endpoint to an SCM-protocol one. Recon's cached IP is
+ // also a much narrower problem in practice (Recon is rarely
+ // pod-rescheduled the way SCM-HA peers are).
+ if (existing.isPassive()) {
+ return null;
+ }
+ InetSocketAddress resolved = existing.resolveLatestAddress();
+ if (resolved == null) {
+ return null;
+ }
+ // Refuse the swap if the freshly-resolved address collides with
+ // another already-registered SCM peer key (e.g. transient kube-dns
+ // returning peer-B's IP for peer-A's hostname). Without this guard,
+ // the put below would silently overwrite peer-B's
+ // EndpointStateMachine, leaking its executor and orphaning its
+ // task thread, while peer-A's task ends up dialing peer-B's IP
+ // with peer-A's host context. Leave the stale endpoint in place;
+ // the next heartbeat retries DNS.
+ if (!resolved.equals(oldAddress)
+ && scmMachines.containsKey(resolved)) {
+ LOG.warn("DNS re-resolution: refused to swap endpoint {} -> {} "
+ + "because the new address collides with an already-registered "
+ + "SCM peer. Leaving stale endpoint in place.",
+ oldAddress, resolved);
+ return null;
+ }
+ String preservedHostPort = existing.getHostAndPort();
+ // Build the replacement BEFORE removing the stale entry so a
+ // failure to construct the new proxy (transient DNS, peer not
+ // yet accepting on the new IP, NetUtils refusing the address)
+ // leaves the existing endpoint registered. Otherwise the peer
+ // would disappear from scmMachines entirely and the next
+ // heartbeat cycle would have nothing to dial -- much worse than
+ // the pre-PR behaviour of dialing the stale IP.
+ EndpointStateMachine replacement;
+ try {
+ replacement = buildScmEndpoint(resolved, preservedHostPort,
+ threadNamePrefix);
Review Comment:
`refreshSCMServer` holds the connection-manager write lock while doing DNS
resolution (`existing.resolveLatestAddress()`) and building a new RPC proxy
(`buildScmEndpoint(...)`). Both can block (slow DNS / proxy init), and while
the write lock is held all readers (eg `getValues()` used by other heartbeat
threads) are stalled. Consider capturing the current endpoint + hostAndPort
under lock, releasing the lock to resolve/build, then re-acquiring the lock to
validate and commit the swap.
##########
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java:
##########
@@ -45,7 +45,30 @@ public class HadoopRpcOMFailoverProxyProvider<T> extends
protected static final Logger LOG =
LoggerFactory.getLogger(HadoopRpcOMFailoverProxyProvider.class);
- private final Text delegationTokenService;
+ /**
+ * Aggregated delegation-token service identifier (the comma-joined
+ * list of per-OM service strings, sorted for stability). Mutable and
+ * volatile so that {@link #onAddressRefreshed(String)} can replace
+ * it in-place after a per-node DNS refresh; readers see either the
+ * old or the new fully-formed value, never a partial state. <p>
+ * Caveat for SECURE clusters with the default
+ * {@code hadoop.security.token.service.use_ip=true}: each per-OM
+ * service is built from the resolved IP, so after an IP refresh
+ * the new aggregate string and the token's frozen old aggregate
+ * string have no common per-OM substring for the refreshed peer.
+ * {@code OzoneDelegationTokenSelector} (substring match) then fails
+ * to select the token for that peer, and the SASL handshake on the
+ * fresh dial cannot present credentials. <p>
+ * Operators that enable {@code ozone.client.failover.resolve-needed}
+ * on a secure cluster MUST set {@code
hadoop.security.token.service.use_ip=false}
+ * (in core-site.xml) so the per-OM service is hostname:port -- a
Review Comment:
This Javadoc line exceeds the 120-char max LineLength (checkstyle does not
ignore Javadoc), so it will fail style checks. Please wrap it onto multiple
lines.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]