Author: todd
Date: Thu Dec 8 02:00:20 2011
New Revision: 1211735
URL: http://svn.apache.org/viewvc?rev=1211735&view=rev
Log:
HDFS-2627. Determine DN's view of which NN is active based on heartbeat
responses. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Thu Dec 8 02:00:20 2011
@@ -45,3 +45,5 @@ HDFS-2626. BPOfferService.verifyAndSetNa
HDFS-2624. ConfiguredFailoverProxyProvider doesn't correctly stop
ProtocolTranslators (todd)
HDFS-2625. TestDfsOverAvroRpc failing after introduction of HeartbeatResponse
type (todd)
+
+HDFS-2627. Determine DN's view of which NN is active based on heartbeat
responses (todd)
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Thu Dec 8 02:00:20 2011
@@ -37,14 +37,15 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.RPC;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
@@ -75,10 +76,31 @@ class BPOfferService {
UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
- private BPServiceActor bpServiceToActive;
+ /**
+ * A reference to the BPServiceActor associated with the currently
+ * ACTIVE NN. In the case that all NameNodes are in STANDBY mode,
+ * this can be null. If non-null, this must always refer to a member
+ * of the {@link #bpServices} list.
+ */
+ private BPServiceActor bpServiceToActive = null;
+
+ /**
+ * The list of all actors for namenodes in this nameservice, regardless
+ * of their active or standby states.
+ */
private List<BPServiceActor> bpServices =
new CopyOnWriteArrayList<BPServiceActor>();
+ /**
+ * Each time we receive a heartbeat from a NN claiming to be ACTIVE,
+ * we record that NN's most recent transaction ID here, so long as it
+ * is more recent than the previous value. This allows us to detect
+ * split-brain scenarios in which a prior NN is still asserting its
+ * ACTIVE state but with a too-low transaction ID. See HDFS-2627
+ * for details.
+ */
+ private long lastActiveClaimTxId = -1;
+
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
@@ -87,10 +109,6 @@ class BPOfferService {
for (InetSocketAddress addr : nnAddrs) {
this.bpServices.add(new BPServiceActor(addr, this));
}
- // TODO(HA): currently we just make the first one the initial
- // active. In reality it should start in an unknown state and then
- // as we figure out which is active, designate one as such.
- this.bpServiceToActive = this.bpServices.get(0);
}
void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
@@ -109,19 +127,23 @@ class BPOfferService {
}
/**
- * returns true if BP thread has completed initialization of storage
- * and has registered with the corresponding namenode
- * @return true if initialized
+ * @return true if the service has registered with at least one NameNode.
*/
boolean isInitialized() {
- // TODO(HA) is this right?
- return bpServiceToActive != null && bpServiceToActive.isInitialized();
+ return bpRegistration != null;
}
+ /**
+ * @return true if there is at least one actor thread running which is
+ * talking to a NameNode.
+ */
boolean isAlive() {
- // TODO: should || all the bp actors probably?
- return bpServiceToActive != null &&
- bpServiceToActive.isAlive();
+ for (BPServiceActor actor : bpServices) {
+ if (actor.isAlive()) {
+ return true;
+ }
+ }
+ return false;
}
String getBlockPoolId() {
@@ -322,7 +344,7 @@ class BPOfferService {
* Called when an actor shuts down. If this is the last actor
* to shut down, shuts down the whole blockpool in the DN.
*/
- void shutdownActor(BPServiceActor actor) {
+ synchronized void shutdownActor(BPServiceActor actor) {
if (bpServiceToActive == actor) {
bpServiceToActive = null;
}
@@ -339,7 +361,7 @@ class BPOfferService {
}
@Deprecated
- InetSocketAddress getNNSocketAddress() {
+ synchronized InetSocketAddress getNNSocketAddress() {
// TODO(HA) this doesn't make sense anymore
return bpServiceToActive.getNNSocketAddress();
}
@@ -383,8 +405,61 @@ class BPOfferService {
* @return a proxy to the active NN
*/
@Deprecated
- DatanodeProtocol getActiveNN() {
- return bpServiceToActive.bpNamenode;
+ synchronized DatanodeProtocol getActiveNN() {
+ if (bpServiceToActive != null) {
+ return bpServiceToActive.bpNamenode;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Update the BPOS's view of which NN is active, based on a heartbeat
+ * response from one of the actors.
+ *
+ * @param actor the actor which received the heartbeat
+ * @param nnHaState the HA-related heartbeat contents
+ */
+ synchronized void updateActorStatesFromHeartbeat(
+ BPServiceActor actor,
+ NNHAStatusHeartbeat nnHaState) {
+ final long txid = nnHaState.getTxId();
+
+ final boolean nnClaimsActive =
+ nnHaState.getState() == NNHAStatusHeartbeat.State.ACTIVE;
+ final boolean bposThinksActive = bpServiceToActive == actor;
+ final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
+
+ if (nnClaimsActive && !bposThinksActive) {
+ LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+ "txid=" + txid);
+ if (!isMoreRecentClaim) {
+ // Split-brain scenario - an NN is trying to claim active
+ // state when a different NN has already claimed it with a higher
+ // txid.
+ LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+ txid + " but there was already a more recent claim at txid=" +
+ lastActiveClaimTxId);
+ return;
+ } else {
+ if (bpServiceToActive == null) {
+ LOG.info("Acknowledging ACTIVE Namenode " + actor);
+ } else {
+ LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+ bpServiceToActive + " at higher txid=" + txid);
+ }
+ bpServiceToActive = actor;
+ }
+ } else if (!nnClaimsActive && bposThinksActive) {
+ LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+ "txid=" + nnHaState.getTxId());
+ bpServiceToActive = null;
+ }
+
+ if (bpServiceToActive == actor) {
+ assert txid >= lastActiveClaimTxId;
+ lastActiveClaimTxId = txid;
+ }
}
/**
@@ -415,7 +490,17 @@ class BPOfferService {
}
}
- boolean processCommandFromActor(DatanodeCommand cmd,
+ /**
+ * Run an immediate heartbeat from all actors. Used by tests.
+ */
+ @VisibleForTesting
+ void triggerHeartbeatForTests() throws IOException {
+ for (BPServiceActor actor : bpServices) {
+ actor.triggerHeartbeatForTests();
+ }
+ }
+
+ synchronized boolean processCommandFromActor(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
assert bpServices.contains(actor);
if (actor == bpServiceToActive) {
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
Thu Dec 8 02:00:20 2011
@@ -284,6 +284,14 @@ class BPServiceActor implements Runnable
lastBlockReport = 0;
blockReport();
}
+
+ @VisibleForTesting
+ void triggerHeartbeatForTests() throws IOException {
+ synchronized (receivedAndDeletedBlockList) {
+ lastHeartbeat = 0;
+ receivedAndDeletedBlockList.notifyAll();
+ }
+ }
/**
* Report the list blocks to the Namenode
@@ -420,8 +428,18 @@ class BPServiceActor implements Runnable
lastHeartbeat = startTime;
if (!dn.areHeartbeatsDisabledForTests()) {
HeartbeatResponse resp = sendHeartBeat();
+ assert resp != null;
dn.getMetrics().addHeartbeat(now() - startTime);
+ // If the state of this NN has changed (eg STANDBY->ACTIVE)
+ // then let the BPOfferService update itself.
+ //
+ // Important that this happens before processCommand below,
+ // since the first heartbeat to a new active might have commands
+ // that we should actually process.
+ bpos.updateActorStatesFromHeartbeat(
+ this, resp.getNameNodeHaState());
+
long startProcessCommands = now();
if (!processCommand(resp.getCommands()))
continue;
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Thu Dec 8 02:00:20 2011
@@ -150,11 +150,16 @@ import org.apache.hadoop.hdfs.server.nam
import
org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
import
org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
import
org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage;
+import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
+import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -308,6 +313,12 @@ public class FSNamesystem implements Nam
* Used when this NN is in standby state to read from the shared edit log.
*/
private EditLogTailer editLogTailer = null;
+
+ /**
+ * Reference to the NN's HAContext object. This is only set once
+ * {@link #startCommonServices(Configuration, HAContext)} is called.
+ */
+ private HAContext haContext;
PendingDataNodeMessages getPendingDataNodeMessages() {
return pendingDatanodeMessages;
@@ -434,11 +445,13 @@ public class FSNamesystem implements Nam
/**
* Start services common to both active and standby states
+ * @param haContext
* @throws IOException
*/
- void startCommonServices(Configuration conf) throws IOException {
+ void startCommonServices(Configuration conf, HAContext haContext) throws
IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
+ this.haContext = haContext;
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
@@ -2706,12 +2719,28 @@ public class FSNamesystem implements Nam
cmds = new DatanodeCommand[] {cmd};
}
}
- return new HeartbeatResponse(cmds);
+
+ return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally {
readUnlock();
}
}
+ private NNHAStatusHeartbeat createHaStatusHeartbeat() {
+ HAState state = haContext.getState();
+ NNHAStatusHeartbeat.State hbState;
+ if (state instanceof ActiveState) {
+ hbState = NNHAStatusHeartbeat.State.ACTIVE;
+ } else if (state instanceof StandbyState) {
+ hbState = NNHAStatusHeartbeat.State.STANDBY;
+ } else {
+ throw new AssertionError("Invalid state: " + state.getClass());
+ }
+ return new NNHAStatusHeartbeat(hbState,
+ Math.max(getFSImage().getLastAppliedTxId(),
+ getFSImage().getEditLog().getLastWrittenTxId()));
+ }
+
/**
* Returns whether or not there were available resources at the last check of
* resources.
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Thu Dec 8 02:00:20 2011
@@ -426,7 +426,7 @@ public class NameNode {
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
- namesystem.startCommonServices(conf);
+ namesystem.startCommonServices(conf, haContext);
startHttpServer(conf);
rpcServer.start();
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
Thu Dec 8 02:00:20 2011
@@ -35,17 +35,26 @@ public class HeartbeatResponse implement
/** Commands returned from the namenode to the datanode */
private DatanodeCommand[] commands;
+ /** Information about the current HA-related state of the NN */
+ private NNHAStatusHeartbeat haStatus;
+
public HeartbeatResponse() {
// Empty constructor required for Writable
}
- public HeartbeatResponse(DatanodeCommand[] cmds) {
+ public HeartbeatResponse(DatanodeCommand[] cmds,
+ NNHAStatusHeartbeat haStatus) {
commands = cmds;
+ this.haStatus = haStatus;
}
public DatanodeCommand[] getCommands() {
return commands;
}
+
+ public NNHAStatusHeartbeat getNameNodeHaState() {
+ return haStatus;
+ }
///////////////////////////////////////////
// Writable
@@ -58,6 +67,7 @@ public class HeartbeatResponse implement
ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
null, true);
}
+ haStatus.write(out);
}
@Override
@@ -69,5 +79,7 @@ public class HeartbeatResponse implement
commands[i] = (DatanodeCommand) ObjectWritable.readObject(in,
objectWritable, null);
}
+ haStatus = new NNHAStatusHeartbeat();
+ haStatus.readFields(in);
}
}
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java?rev=1211735&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
(added)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java
Thu Dec 8 02:00:20 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
[email protected]
[email protected]
+public class NNHAStatusHeartbeat implements Writable {
+
+ private State state;
+ private long txid = HdfsConstants.INVALID_TXID;
+
+ public NNHAStatusHeartbeat() {
+ }
+
+ public NNHAStatusHeartbeat(State state, long txid) {
+ this.state = state;
+ this.txid = txid;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public long getTxId() {
+ return txid;
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeEnum(out, state);
+ out.writeLong(txid);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ state = WritableUtils.readEnum(in, State.class);
+ txid = in.readLong();
+ }
+
+ @InterfaceAudience.Private
+ public enum State {
+ ACTIVE,
+ STANDBY;
+ }
+}
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java
Thu Dec 8 02:00:20 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Writable;
@InterfaceStability.Evolving
public class HeartbeatResponseWritable implements Writable {
private DatanodeCommandWritable[] commands;
+ private NNHAStatusHeartbeatWritable haStatus;
public HeartbeatResponseWritable() {
// Empty constructor for Writable
@@ -41,7 +42,8 @@ public class HeartbeatResponseWritable i
}
public HeartbeatResponse convert() {
- return new HeartbeatResponse(DatanodeCommandWritable.convert(commands));
+ return new HeartbeatResponse(DatanodeCommandWritable.convert(commands),
+ NNHAStatusHeartbeatWritable.convert(haStatus));
}
///////////////////////////////////////////
@@ -55,6 +57,7 @@ public class HeartbeatResponseWritable i
ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
null, true);
}
+ haStatus.write(out);
}
@Override
@@ -66,6 +69,8 @@ public class HeartbeatResponseWritable i
commands[i] = (DatanodeCommandWritable) ObjectWritable.readObject(in,
objectWritable, null);
}
+ haStatus = new NNHAStatusHeartbeatWritable();
+ haStatus.readFields(in);
}
public static HeartbeatResponseWritable convert(
@@ -73,4 +78,4 @@ public class HeartbeatResponseWritable i
return new HeartbeatResponseWritable(DatanodeCommandWritable.convert(resp
.getCommands()));
}
-}
\ No newline at end of file
+}
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java?rev=1211735&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
(added)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java
Thu Dec 8 02:00:20 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocolR23Compatible;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
[email protected]
[email protected]
+/**
+ * Response to {@link DatanodeProtocol#sendHeartbeat}
+ */
+public class NNHAStatusHeartbeatWritable implements Writable {
+
+ private State state;
+ private long txid = HdfsConstants.INVALID_TXID;
+
+ public NNHAStatusHeartbeatWritable() {
+ }
+
+ public NNHAStatusHeartbeatWritable(State state, long txid) {
+ this.state = state;
+ this.txid = txid;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public long getTxId() {
+ return txid;
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeEnum(out, state);
+ out.writeLong(txid);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ state = WritableUtils.readEnum(in, State.class);
+ txid = in.readLong();
+ }
+
+ public static NNHAStatusHeartbeat convert(
+ NNHAStatusHeartbeatWritable haStatus) {
+ return new NNHAStatusHeartbeat(haStatus.getState(), haStatus.getTxId());
+ }
+}
\ No newline at end of file
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java?rev=1211735&r1=1211734&r2=1211735&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
Thu Dec 8 02:00:20 2011
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -32,9 +33,12 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.test.GenericTestUtils;
@@ -43,6 +47,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
@@ -63,13 +69,15 @@ public class TestBPOfferService {
private DatanodeProtocol mockNN1;
private DatanodeProtocol mockNN2;
+ private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
+ private int heartbeatCounts[] = new int[2];
private DataNode mockDn;
private FSDatasetInterface mockFSDataset;
@Before
public void setupMocks() throws Exception {
- mockNN1 = setupNNMock();
- mockNN2 = setupNNMock();
+ mockNN1 = setupNNMock(0);
+ mockNN2 = setupNNMock(1);
// Set up a mock DN with the bare-bones configuration
// objects, etc.
@@ -92,14 +100,17 @@ public class TestBPOfferService {
/**
* Set up a mock NN with the bare minimum for a DN to register to it.
*/
- private DatanodeProtocol setupNNMock() throws Exception {
+ private DatanodeProtocol setupNNMock(int nnIdx) throws Exception {
DatanodeProtocol mock = Mockito.mock(DatanodeProtocol.class);
Mockito.doReturn(
new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID,
0, HdfsConstants.LAYOUT_VERSION))
.when(mock).versionRequest();
- Mockito.doReturn(new HeartbeatResponse(null))
+ Mockito.doReturn(new DatanodeRegistration("fake-node"))
+ .when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
+
+ Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
.when(mock).sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.anyLong(),
@@ -109,11 +120,32 @@ public class TestBPOfferService {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt());
-
+ mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(State.STANDBY, 0);
return mock;
}
/**
+ * Mock answer for heartbeats which returns an empty set of commands
+ * and the HA status for the chosen NN from the
+ * {@link TestBPOfferService#mockHaStatuses} array.
+ */
+ private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
+ private final int nnIdx;
+
+ public HeartbeatAnswer(int nnIdx) {
+ this.nnIdx = nnIdx;
+ }
+
+ @Override
+ public HeartbeatResponse answer(InvocationOnMock invocation) throws
Throwable {
+ heartbeatCounts[nnIdx]++;
+ return new HeartbeatResponse(new DatanodeCommand[0],
+ mockHaStatuses[nnIdx]);
+ }
+ }
+
+
+ /**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
*/
@@ -204,6 +236,53 @@ public class TestBPOfferService {
bpos.stop();
}
}
+
+ /**
+ * Test that the DataNode determines the active NameNode correctly
+ * based on the HA-related information in heartbeat responses.
+ * See HDFS-2627.
+ */
+ @Test
+ public void testPickActiveNameNode() throws Exception {
+ BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+ bpos.start();
+ try {
+ waitForInitialization(bpos);
+
+ // Should start with neither NN as active.
+ assertNull(bpos.getActiveNN());
+
+ // Have NN1 claim active at txid 1
+ mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 1);
+ waitForHeartbeats(bpos);
+ assertSame(mockNN1, bpos.getActiveNN());
+
+ // NN2 claims active at a higher txid
+ mockHaStatuses[1] = new NNHAStatusHeartbeat(State.ACTIVE, 2);
+ waitForHeartbeats(bpos);
+ assertSame(mockNN2, bpos.getActiveNN());
+
+ // Even after another heartbeat from the first NN, it should
+ // think NN2 is active, since it claimed a higher txid
+ waitForHeartbeats(bpos);
+ assertSame(mockNN2, bpos.getActiveNN());
+
+ // Even if NN2 goes to standby, DN shouldn't reset to talking to NN1,
+ // because NN1's txid is lower than the last active txid. Instead,
+ // it should consider neither active.
+ mockHaStatuses[1] = new NNHAStatusHeartbeat(State.STANDBY, 2);
+ waitForHeartbeats(bpos);
+ assertNull(bpos.getActiveNN());
+
+ // Now if NN1 goes back to a higher txid, it should be considered active
+ mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 3);
+ waitForHeartbeats(bpos);
+ assertSame(mockNN1, bpos.getActiveNN());
+
+ } finally {
+ bpos.stop();
+ }
+ }
private void waitForOneToFail(final BPOfferService bpos)
throws Exception {
@@ -269,6 +348,30 @@ public class TestBPOfferService {
}, 500, 10000);
}
+ private void waitForHeartbeats(BPOfferService bpos)
+ throws Exception {
+ final int countAtStart[];
+ synchronized (heartbeatCounts) {
+ countAtStart = Arrays.copyOf(
+ heartbeatCounts, heartbeatCounts.length);
+ }
+ bpos.triggerHeartbeatForTests();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (heartbeatCounts) {
+ for (int i = 0; i < countAtStart.length; i++) {
+ if (heartbeatCounts[i] <= countAtStart[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ }, 200, 10000);
+ }
+
+
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
ExtendedBlock fakeBlock,
DatanodeProtocol mockNN) throws Exception {