This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 8d1e00f7f7307f33f44493a4b33b1e2fe0989ed5 Author: Michael Blow <[email protected]> AuthorDate: Fri Feb 7 19:31:49 2020 -0500 [NO ISSUE][NET] Re-resolve InetSocketAddresses on failure Support IP address updates on replicas & CC by re-resolving the IP address on connection failure Change-Id: I2532a27c2abb8b891df668b0adc95cc33da99620 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4965 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- asterixdb/NOTICE | 2 +- .../replication/IReplicationDestination.java | 4 +-- .../asterix/common/storage/ReplicaIdentifier.java | 10 ++++++- .../asterix/replication/api/PartitionReplica.java | 11 +++++++- .../replication/api/ReplicationDestination.java | 32 ++++++++++++++++------ .../replication/management/ReplicationManager.java | 5 ++-- .../replication/sync/ReplicaSynchronizer.java | 3 +- hyracks-fullstack/NOTICE | 2 +- .../hyracks/ipc/impl/ReconnectingIPCHandle.java | 3 +- .../java/org/apache/hyracks/util/NetworkUtil.java | 19 +++++++++++++ 10 files changed, 72 insertions(+), 19 deletions(-) diff --git a/asterixdb/NOTICE b/asterixdb/NOTICE index 7615782..b4729a8 100644 --- a/asterixdb/NOTICE +++ b/asterixdb/NOTICE @@ -1,5 +1,5 @@ Apache AsterixDB -Copyright 2015-2019 The Apache Software Foundation +Copyright 2015-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java index 2fe9de8..f835c43 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationDestination.java @@ -52,9 +52,9 @@ public interface IReplicationDestination { Set<IPartitionReplica> getReplicas(); /** - * Gets the location of this {@link IReplicationDestination} + * Gets the (resolved) location of this {@link IReplicationDestination} * - * @return the location + * @return the (resolved) location */ InetSocketAddress getLocation(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java index f68ad09..c4bb74c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java @@ -20,11 +20,13 @@ package org.apache.asterix.common.storage; import java.net.InetSocketAddress; +import org.apache.hyracks.util.NetworkUtil; + public class ReplicaIdentifier { private final int partition; - private final InetSocketAddress location; private final String id; + private volatile InetSocketAddress location; private ReplicaIdentifier(int partition, InetSocketAddress location) { this.partition = partition; @@ -44,6 +46,12 @@ public class ReplicaIdentifier { return location; } + public InetSocketAddress refreshLocation() { + //noinspection NonAtomicOperationOnVolatileField + location = NetworkUtil.refresh(location); + return location; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java index e81c25a..f2d2496 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -97,7 +97,7 @@ public class PartitionReplica implements IPartitionReplica { public synchronized ISocketChannel getChannel() { try { if (!NetworkingUtil.isHealthy(sc)) { - sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation()); + establishReplicaConnection(); } return sc; } catch (IOException e) { @@ -105,6 +105,15 @@ public class PartitionReplica implements IPartitionReplica { } } + private void establishReplicaConnection() throws IOException { + try { + sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation()); + } catch (Exception e) { + // try to re-resolve the address, in case our replica has had his IP address updated + sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.refreshLocation()); + } + } + public synchronized void close() { try { if (sc != null) { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java index ef1bc28..782a801 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.replication.IReplicationDestination; import org.apache.asterix.replication.management.NetworkingUtil; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,15 +40,20 @@ public class ReplicationDestination implements IReplicationDestination { private static final Logger LOGGER = LogManager.getLogger(); private final Set<IPartitionReplica> replicas = new HashSet<>(); - private final InetSocketAddress location; + private final InetSocketAddress inputLocation; + private InetSocketAddress resolvedLocation; private ISocketChannel logRepChannel; private ReplicationDestination(InetSocketAddress location) { - this.location = location; + this.inputLocation = location; + this.resolvedLocation = NetworkUtil.ensureResolved(location); } public static ReplicationDestination at(InetSocketAddress location) { - return new ReplicationDestination(location); + if (!location.isUnresolved()) { + throw new IllegalArgumentException("only unresolved addresses are allowed!"); + } + return new ReplicationDestination(new InetSocketAddress(location.getHostString(), location.getPort())); } @Override @@ -79,7 +85,7 @@ public class ReplicationDestination implements IReplicationDestination { public synchronized ISocketChannel getLogReplicationChannel(INcApplicationContext appCtx) { try { if (!NetworkingUtil.isHealthy(logRepChannel)) { - logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, location); + establishReplicaConnection(appCtx); } return logRepChannel; } catch (IOException e) { @@ -87,6 +93,16 @@ public class ReplicationDestination implements IReplicationDestination { } } + protected void establishReplicaConnection(INcApplicationContext appCtx) throws IOException { + try { + logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation); + } catch (Exception e) { + // try to re-resolve the address, in case our replica has had his IP address updated + resolvedLocation = NetworkUtil.refresh(resolvedLocation); + logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, resolvedLocation); + } + } + private synchronized void closeLogReplicationChannel() { try { if (logRepChannel != null && logRepChannel.getSocketChannel().isOpen()) { @@ -101,7 +117,7 @@ public class ReplicationDestination implements IReplicationDestination { @Override public InetSocketAddress getLocation() { - return location; + return resolvedLocation; } @Override @@ -113,16 +129,16 @@ public class ReplicationDestination implements IReplicationDestination { return false; } ReplicationDestination that = (ReplicationDestination) o; - return Objects.equals(location, that.location); + return Objects.equals(inputLocation, that.inputLocation); } @Override public String toString() { - return location.toString(); + return resolvedLocation.toString(); } @Override public int hashCode() { - return Objects.hash(location); + return Objects.hash(inputLocation); } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index bd99ec4..7ed674e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -34,6 +34,7 @@ import org.apache.asterix.common.replication.ReplicationStrategyFactory; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.replication.api.ReplicationDestination; import org.apache.hyracks.api.replication.IReplicationJob; +import org.apache.hyracks.util.NetworkUtil; import org.apache.hyracks.util.annotations.ThreadSafe; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -60,7 +61,7 @@ public class ReplicationManager implements IReplicationManager { @Override public void register(IPartitionReplica replica) { synchronized (dests) { - final InetSocketAddress location = replica.getIdentifier().getLocation(); + final InetSocketAddress location = NetworkUtil.ensureUnresolved(replica.getIdentifier().getLocation()); final ReplicationDestination replicationDest = dests.computeIfAbsent(location, ReplicationDestination::at); replicationDest.add(replica); logReplicationManager.register(replicationDest); @@ -71,7 +72,7 @@ public class ReplicationManager implements IReplicationManager { @Override public void unregister(IPartitionReplica replica) { synchronized (dests) { - final InetSocketAddress location = replica.getIdentifier().getLocation(); + final InetSocketAddress location = NetworkUtil.ensureUnresolved(replica.getIdentifier().getLocation()); final ReplicationDestination dest = dests.get(location); if (dest == null) { LOGGER.warn(() -> "Asked to unregister unknown replica " + replica); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 123709b..261236c 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -44,8 +44,7 @@ public class ReplicaSynchronizer { } public void sync() throws IOException { - final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock(); - synchronized (syncLock) { + synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) { final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager(); try { // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas diff --git a/hyracks-fullstack/NOTICE b/hyracks-fullstack/NOTICE index 77f31ad..95fe98a 100644 --- a/hyracks-fullstack/NOTICE +++ b/hyracks-fullstack/NOTICE @@ -1,5 +1,5 @@ Apache Hyracks and Algebricks -Copyright 2015-2019 The Apache Software Foundation +Copyright 2015-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java index db0ed6b..a3578ad 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import org.apache.hyracks.ipc.api.IIPCEventListener; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.exceptions.IPCException; +import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -78,7 +79,7 @@ class ReconnectingIPCHandle implements IIPCHandle { } LOGGER.warn("ipcHandle {} disconnected; will attempt to reconnect {} times", delegate, reconnectAttempts); listener.ipcHandleDisconnected(delegate); - delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts); + delegate = ipc.getHandle(NetworkUtil.refresh(getRemoteAddress()), reconnectAttempts); LOGGER.warn("ipcHandle {} restored", delegate); listener.ipcHandleRestored(delegate); return delegate; diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java index 763319f..3f6e90c 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import javax.net.ssl.SSLEngine; @@ -130,4 +131,22 @@ public class NetworkUtil { enlargedBuffer.put(src); return enlargedBuffer; } + + public static InetSocketAddress ensureUnresolved(InetSocketAddress address) { + return address.isUnresolved() ? address + : InetSocketAddress.createUnresolved(address.getHostString(), address.getPort()); + } + + public static InetSocketAddress ensureResolved(InetSocketAddress address) { + return address.isUnresolved() ? new InetSocketAddress(address.getHostString(), address.getPort()) : address; + } + + public static InetSocketAddress refresh(InetSocketAddress original) { + InetSocketAddress refreshed = new InetSocketAddress(original.getHostString(), original.getPort()); + if (!Objects.equals(original.getAddress(), refreshed.getAddress())) { + LOGGER.warn("ip address updated on refresh (was: {}, now: {})", original.getAddress(), + refreshed.getAddress()); + } + return refreshed; + } }
