Author: suresh
Date: Sun May 6 21:45:35 2012
New Revision: 1334792
URL: http://svn.apache.org/viewvc?rev=1334792&view=rev
Log:
HDFS-3186. Add ability to journal service to sync journals from another active
journal service. Contributed by Brandon Li.
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
Sun May 6 21:45:35 2012
@@ -26,6 +26,9 @@ HDFS-3092 branch changes
HDFS-3313. Create a protocol for journal service synchronziation. (Brandon
Li via szetszwo)
+ HDFS-3186. Add ability to journal service to sync journals from another
+ active journal service. (Brandon Li via suresh)
+
IMPROVEMENTS
OPTIMIZATIONS
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Sun May 6 21:45:35 2012
@@ -204,6 +204,7 @@ public class DFSConfigKeys extends Commo
// This is a comma separated host:port list of addresses hosting the journal
service
public static final String DFS_JOURNAL_ADDRESS_KEY =
"dfs.journal.addresses";
public static final String DFS_JOURNAL_EDITS_DIR_KEY =
"dfs.journal.edits.dir";
+ public static final String DFS_JOURNAL_HTTP_ADDRESS_KEY =
"dfs.journal.http-addresses";
public static final String DFS_JOURNAL_HTTPS_PORT_KEY =
"dfs.journal.https-port";
public static final int DFS_JOURNAL_HTTPS_PORT_DEFAULT = 50510;
public static final String DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY =
"dfs.journal.kerberos.https.principal";
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
Sun May 6 21:45:35 2012
@@ -1025,9 +1025,21 @@ public class DFSUtil {
}
}
+ private static Collection<InetSocketAddress> getAddresses(Configuration conf,
+ String addrKey) {
+ Collection<String> addresses = conf.getTrimmedStringCollection(addrKey);
+ Collection<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+ for (String address : emptyAsSingletonNull(addresses)) {
+ if (address == null) {
+ continue;
+ }
+ ret.add(NetUtils.createSocketAddr(address));
+ }
+ return ret;
+ }
/**
- * Returns list of InetSocketAddresses of journalnodes from the
+ * Returns list of RPC server InetSocketAddresses of journal services from
the
* configuration.
*
* @param conf configuration
@@ -1035,15 +1047,38 @@ public class DFSUtil {
*/
public static Collection<InetSocketAddress> getJournalNodeAddresses(
Configuration conf) {
- Collection<String> jnames = conf
- .getTrimmedStringCollection(DFS_JOURNAL_ADDRESS_KEY);
- Collection<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
- for (String jname : emptyAsSingletonNull(jnames)) {
- if (jname == null) {
- continue;
+ return getAddresses(conf, DFS_JOURNAL_ADDRESS_KEY);
+ }
+
+ /**
+ * Returns list of http InetSocketAddresses of journal services from the
+ * configuration.
+ *
+ * @param conf configuration
+ * @return list of http InetSocketAddresses
+ */
+ public static Collection<InetSocketAddress> getJournalNodeHttpAddresses(
+ Configuration conf) {
+ return getAddresses(conf, DFS_JOURNAL_HTTP_ADDRESS_KEY);
+ }
+
+ /**
+ * Returns corresponding rpc address with the hostname running journal
+ * service.
+ *
+ * @param conf configuration
+ * @param hostname the hostname in the http address
+ * @return rpc address of the journal service
+ */
+ public static InetSocketAddress getJournalRpcAddrFromHostName(
+ Configuration conf, String hostname) {
+ Collection<InetSocketAddress> jRpcAddr = DFSUtil
+ .getJournalNodeAddresses(conf);
+ for (InetSocketAddress addr : jRpcAddr) {
+ if (addr != null && addr.getHostName().equals(hostname)) {
+ return addr;
}
- ret.add(NetUtils.createSocketAddr(jname));
}
- return ret;
+ return null;
}
}
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
Sun May 6 21:45:35 2012
@@ -117,7 +117,7 @@ class Journal {
return image.getEditLog();
}
- RemoteEditLogManifest getRemoteEditLogs(long sinceTxId) throws IOException {
+ RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
return image.getEditLog().getEditLogManifest(sinceTxId);
}
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
Sun May 6 21:45:35 2012
@@ -63,7 +63,7 @@ public class JournalHttpServer {
private final Configuration conf;
JournalHttpServer(Configuration conf, Journal journal,
- InetSocketAddress bindAddress) throws Exception {
+ InetSocketAddress bindAddress) {
this.conf = conf;
this.localJournal = journal;
this.httpAddress = bindAddress;
@@ -125,12 +125,20 @@ public class JournalHttpServer {
+ " and https port is: " + httpsPort);
}
- void stop() throws Exception {
+ void stop() throws IOException {
if (httpServer != null) {
- httpServer.stop();
+ try {
+ httpServer.stop();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
+ InetSocketAddress getHttpAddress() {
+ return httpAddress;
+ }
+
public static Journal getJournalFromContext(ServletContext context) {
return (Journal) context.getAttribute(JOURNAL_ATTRIBUTE_KEY);
}
@@ -145,7 +153,7 @@ public class JournalHttpServer {
* @return true if a new image has been downloaded and needs to be loaded
* @throws IOException
*/
- public boolean downloadEditFiles(final String jnHostPort,
+ boolean downloadEditFiles(final String jnHostPort,
final RemoteEditLogManifest manifest) throws IOException {
// Sanity check manifest
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
Sun May 6 21:45:35 2012
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.journalservice;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -34,9 +37,12 @@ import org.apache.hadoop.hdfs.protocolPB
import
org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.JournalServiceProtocols;
import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -47,6 +53,7 @@ import org.apache.hadoop.ipc.ProtobufRpc
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@@ -68,14 +75,20 @@ import com.google.protobuf.BlockingServi
*/
public class JournalService implements JournalServiceProtocols {
public static final Log LOG =
LogFactory.getLog(JournalService.class.getName());
-
private final InetSocketAddress nnAddress;
private NamenodeRegistration registration;
private final NamenodeProtocol namenode;
private final StateHandler stateHandler = new StateHandler();
private final RPC.Server rpcServer;
+ private final JournalHttpServer httpServer;
private long epoch = 0;
private String fencerInfo;
+ private Daemon syncThread = null;
+ private Configuration conf;
+
+ // Flags to indicate whether to start sync
+ private boolean toStartSync = false;
+ private long syncSinceTxid = -1;
private final Journal journal;
private final JournalListener listener;
@@ -128,12 +141,35 @@ public class JournalService implements J
current = State.WAITING_FOR_ROLL;
}
- synchronized void startLogSegment() {
+ synchronized State startLogSegment() {
+ State prevState = current;
if (current == State.WAITING_FOR_ROLL) {
current = State.SYNCING;
}
+ return prevState;
}
+
+ /**
+ * Try to transit to IN_SYNC state
+ * @return current state. if returned state is not IN_SYNC, caller should
+ * know inSync failed
+ */
+ synchronized State inSync() {
+ if (current == State.IN_SYNC) {
+ throw new IllegalStateException("Service cannot be in " + current
+ + " state.");
+ }
+ if (current == State.SYNCING) {
+ current = State.IN_SYNC;
+ }
+ return current;
+ }
+
+ synchronized void fence() {
+ current = State.WAITING_FOR_ROLL;
+ }
+
synchronized void isStartLogSegmentAllowed() throws IOException {
if (!current.isStartLogSegmentAllowed) {
throw new IOException("Cannot start log segment in " + current
@@ -168,18 +204,21 @@ public class JournalService implements J
* {@code server} is a valid server that is managed out side this
* service.
* @param listener call-back interface to listen to journal activities
+ * @param journal the journal used by both Listener and JournalService
* @throws IOException on error
*/
JournalService(Configuration conf, InetSocketAddress nnAddr,
- InetSocketAddress serverAddress, JournalListener listener)
- throws IOException {
+ InetSocketAddress serverAddress, InetSocketAddress httpAddress,
+ JournalListener listener, Journal journal) throws IOException {
this.nnAddress = nnAddr;
this.listener = listener;
- this.journal = new Journal(conf);
+ this.journal = journal;
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
.getProxy();
this.rpcServer = createRpcServer(conf, serverAddress, this);
+ this.httpServer = new JournalHttpServer(conf, journal, httpAddress);
+ this.conf = conf;
}
Journal getJournal() {
@@ -201,13 +240,18 @@ public class JournalService implements J
/**
* Start the service.
+ * @throws IOException on error
*/
- public void start() {
+ public void start() throws IOException {
stateHandler.start();
// Start the RPC server
LOG.info("Starting journal service rpc server");
rpcServer.start();
+
+ // Start the HTTP server
+ LOG.info("Starting journal service http server");
+ httpServer.start();
for (boolean registered = false, handshakeComplete = false;;) {
try {
@@ -239,6 +283,14 @@ public class JournalService implements J
}
stateHandler.waitForRoll();
+
+ // Create a never ending daemon to sync journal segments
+ // TODO: remove the assumption that "won't delete logs"
+ // use 3 because NN rolls with txid=3 when first journal service joining.
+ // need to fix this after NN is modified to ignore its local storage dir
+ syncThread = new Daemon(new JournalSync(this));
+ syncThread.start();
+
try {
namenode.rollEditLog();
} catch (IOException e) {
@@ -249,9 +301,12 @@ public class JournalService implements J
/**
* Stop the service. For application with RPC Server managed outside, the
* RPC Server must be stopped the application.
+ * @throws IOException on error
*/
public void stop() throws IOException {
if (!stateHandler.isStopped()) {
+ syncThread.interrupt();
+ httpServer.stop();
rpcServer.stop();
journal.close();
}
@@ -277,7 +332,11 @@ public class JournalService implements J
stateHandler.isStartLogSegmentAllowed();
verify(epoch, journalInfo);
listener.startLogSegment(this, txid);
- stateHandler.startLogSegment();
+
+ if (stateHandler.startLogSegment() == State.WAITING_FOR_ROLL) {
+ LOG.info("Notify syncThread to re-sync with txid:" + syncSinceTxid);
+ startSync(syncSinceTxid);
+ }
}
@Override
@@ -294,7 +353,8 @@ public class JournalService implements J
long previousEpoch = epoch;
this.epoch = epoch;
this.fencerInfo = fencerInfo;
-
+ stateHandler.fence();
+
// TODO:HDFS-3092 set lastTransId and inSync
return new FenceResponse(previousEpoch, 0, false);
}
@@ -310,8 +370,8 @@ public class JournalService implements J
}
verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
- //journal has only one storage directory
- return journal.getRemoteEditLogs(sinceTxId);
+ // Journal has only one storage directory
+ return journal.getEditLogManifest(sinceTxId);
}
/** Create an RPC server. */
@@ -435,4 +495,150 @@ public class JournalService implements J
return new JournalSyncProtocolTranslatorPB((JournalSyncProtocolPB) proxy);
}
+
+ /**
+ * Only invoked by sync thread to wait for {@code #syncSinceTxid} to be set
to
+ * start syncing.
+ *
+ * @return txid to start syncing from
+ * @throws InterruptedException
+ */
+ synchronized long waitForStartSync() throws InterruptedException {
+ while (!toStartSync) {
+ wait();
+ }
+ // Sync starting - Unset toStartSync so main thread can set it again
+ toStartSync = false;
+ return syncSinceTxid;
+ }
+
+ /**
+ * Only invoked by main thread to notify sync thread that another round of
+ * sync is needed
+ */
+ synchronized void startSync(long sinceTxid) {
+ if (toStartSync) {
+ LOG.trace("toStartSync is already set.");
+ return;
+ }
+ toStartSync = true;
+ syncSinceTxid = sinceTxid;
+ notify();
+ }
+
+ /**
+ * JournalSync downloads journal segments from other journal services
+ */
+ class JournalSync implements Runnable {
+ private final JournalInfo journalInfo;
+ private final JournalService journalService;
+ private long sinceTxid;
+
+ /**
+ * Constructor
+ * @param journalService Local journal service
+ */
+ JournalSync(JournalService journalService) {
+ NNStorage storage = journalService.getJournal().getStorage();
+ this.journalInfo = new JournalInfo(storage.layoutVersion,
+ storage.clusterID, storage.namespaceID);
+ this.sinceTxid = 0;
+ this.journalService = journalService;
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ sinceTxid = journalService.waitForStartSync();
+ syncAllJournalSegments(conf, journalInfo, sinceTxid);
+ } catch (IOException e) {
+ LOG.error("Unable to sync for "
+ + journalService.getRegistration().getHttpAddress()
+ + " with exception: " + e);
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e1) {
+ break;
+ }
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+ LOG.info("Stopping the JouranlSync thread");
+ }
+
+ public String toString() {
+ return "JournalSync for "
+ + journalService.getRegistration().getHttpAddress();
+ }
+
+ /**
+ * Contact journal service one by one to get all missed journal segments
+ *
+ * @param conf Configuration
+ * @param journalInfo the JournalInfo of the local journal service
+ * @param sinceTxid the transaction id to start with
+ * @throws IOException
+ */
+ void syncAllJournalSegments(Configuration conf, JournalInfo journalInfo,
+ long sinceTxid) throws IOException {
+
+ // Get a list of configured journal services
+ Collection<InetSocketAddress> addrList = DFSUtil
+ .getJournalNodeHttpAddresses(conf);
+ FSEditLog editLog = journal.getEditLog();
+ File currentDir = new File(
+ conf.get(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY));
+
+ boolean needSync = !editLog.hasCompleteJournalSegments(sinceTxid,
+ currentDir);
+ if (!needSync) {
+ LOG.trace("Nothing to sync.");
+ return;
+ }
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ InetSocketAddress selfAddr = journalService.httpServer.getHttpAddress();
+ for (InetSocketAddress addr : addrList) {
+ try {
+ // Skip itself
+ if (addr.getHostName().equals(selfAddr.getHostName())
+ && addr.getPort() == selfAddr.getPort()) {
+ continue;
+ }
+ // Download journal segments
+ InetSocketAddress rpcAddr = DFSUtil.getJournalRpcAddrFromHostName(
+ conf, addr.getHostName());
+ JournalSyncProtocol syncProxy = createProxyWithJournalSyncProtocol(
+ rpcAddr, conf, ugi);
+ RemoteEditLogManifest manifest = syncProxy.getEditLogManifest(
+ journalInfo, sinceTxid);
+ httpServer.downloadEditFiles(NetUtils.getHostPortString(addr),
+ manifest);
+ } catch (IOException e) {
+ LOG.debug("Sync failed for " + selfAddr + "with exception ", e);
+ // Ignore error and try the next journal service
+ }
+
+ if (editLog.hasCompleteJournalSegments(sinceTxid, currentDir)) {
+ needSync = false;
+ break;
+ }
+ }
+
+ if (needSync) {
+ throw new IOException("Journal sync failed.");
+ }
+
+ // Journal service may not be in SYNCING state
+ State jState = stateHandler.inSync();
+ if (jState != State.IN_SYNC) {
+ LOG.debug("Journal service state changed during syncing : " + jState);
+ } else {
+ LOG.debug("Journal sync is done.");
+ // TODO: report IN_SYNC state to NN. Note that, it's ok if state
changes
+ // to another state because NN could reject the IN_SYNC report
+ }
+ }
+ }
}
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Sun May 6 21:45:35 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
import static org.apache.hadoop.hdfs.server.common.Util.now;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -437,6 +439,59 @@ public class FSEditLog {
}
/**
+ * Check if current node has complete set of journal segments from sinceTxid
+ * to (curSegmentTxId-1)
+ * @param sinceTxid the transaction to start with
+ * @param currentDir the storage directory to find journal segments
+ * @return true if journal segments are complete, otherwise false
+ * @throws IOException
+ */
+ public synchronized boolean hasCompleteJournalSegments(long sinceTxid,
+ File currentDir) throws IOException {
+ List<RemoteEditLog> segments = getFinalizedSegments(sinceTxid, currentDir);
+ long curSegTxid = getCurSegmentTxId();
+ return hasCompleteJournalSegments(segments, sinceTxid, curSegTxid);
+ }
+
+ public static boolean hasCompleteJournalSegments(
+ List<RemoteEditLog> segments, long sinceTxid, long curSegTxid)
+ throws IOException {
+ long minTxid = -1, maxTxid = -1;
+
+ if (sinceTxid > curSegTxid) {
+ throw new RuntimeException("illegal input: sinceTxid=" + sinceTxid
+ + " curSegTxid= " + curSegTxid);
+ }
+ if (segments.size() == 0) {
+ return false;
+ }
+
+ for (RemoteEditLog log : segments) {
+ if (log.getStartTxId() > curSegTxid || log.getEndTxId() > curSegTxid) {
+ throw new RuntimeException("Invalid log: [" + log.getStartTxId() + ","
+ + log.getEndTxId() + "] and curSegTxid=" + curSegTxid);
+ }
+
+ if (minTxid == -1 || minTxid == -1) {
+ minTxid = log.getStartTxId();
+ maxTxid = log.getEndTxId();
+ assert (minTxid > 0 && maxTxid > 0 && maxTxid > minTxid);
+ } else {
+ // check gap in the middle
+ if (maxTxid != log.getStartTxId() - 1) {
+ return false;
+ }
+ maxTxid = log.getEndTxId();
+ }
+ }
+ // check gap at ends
+ if (minTxid > sinceTxid || maxTxid < curSegTxid - 1) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Set the transaction ID to use for the next transaction written.
*/
synchronized void setNextTxId(long nextTxId) {
@@ -860,7 +915,15 @@ public class FSEditLog {
throws IOException {
return journalSet.getEditLogManifest(fromTxId);
}
-
+
+ /**
+ * Return a list of what finalized edit logs are available
+ */
+ public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+ File currentDir) throws IOException {
+ return journalSet.getFinalizedSegments(fromTxId, currentDir);
+ }
+
/**
* Finalizes the current edit log and opens a new log segment.
* @return the transaction id of the BEGIN_LOG_SEGMENT transaction
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
Sun May 6 21:45:35 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -618,6 +619,39 @@ public class JournalSet implements Journ
}
/**
+ * Returns a list of finalized edit logs that are available in given
+ * storageDir {@code currentDir}. All available edit logs are returned
+ * starting from the transaction id {@code fromTxId}. Note that the list may
+ * have gaps in these finalized edit logs.
+ *
+ * @param fromTxId Starting transaction id to read the logs.
+ * @param currentDir The storage directory to find the logs.
+ * @return a list of finalized segments.
+ */
+ public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+ File currentDir) {
+ List<RemoteEditLog> allLogs = null;
+ for (JournalAndStream j : journals) {
+ if (j.getManager() instanceof FileJournalManager) {
+ FileJournalManager fjm = (FileJournalManager) j.getManager();
+ if (fjm.getStorageDirectory().getRoot().equals(currentDir)) {
+ try {
+ allLogs = fjm.getRemoteEditLogs(fromTxId);
+ break;
+ } catch (IOException t) {
+ LOG.warn("Cannot list edit logs in " + fjm, t);
+ }
+ }
+ }
+ }
+ // sort collected segments
+ if (allLogs != null && allLogs.size() > 0) {
+ Collections.sort(allLogs);
+ }
+ return allLogs;
+ }
+
+ /**
* Add sync times to the buffer.
*/
String getSyncTimes() {
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
Sun May 6 21:45:35 2012
@@ -587,5 +587,16 @@ public class TestDFSUtil {
for (InetSocketAddress addr: ret) {
assertTrue(addr.equals(isa1) || addr.equals(isa2));
}
+
+ // Test Http addresses
+ InetSocketAddress isa3 = NetUtils.createSocketAddr("localhost:50100");
+ InetSocketAddress isa4 = NetUtils.createSocketAddr("localhost:50110");
+ conf.set(DFS_JOURNAL_HTTP_ADDRESS_KEY, "localhost:50100, localhost:50110");
+ ret = DFSUtil.getJournalNodeHttpAddresses(conf);
+ assertEquals(ret.size(), 2);
+
+ for (InetSocketAddress addr: ret) {
+ assertTrue(addr.equals(isa3) || addr.equals(isa4));
+ }
}
}
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
Sun May 6 21:45:35 2012
@@ -22,6 +22,7 @@ import java.io.File;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -35,6 +36,9 @@ public class TestJournal {
static Configuration newConf(String name) {
Configuration conf = new HdfsConfiguration();
File dir = new File(MiniDFSCluster.getBaseDirectory(), name + "-edits");
+ if (dir.exists()) {
+ Assert.assertTrue(FileUtil.fullyDelete(dir));
+ }
Assert.assertTrue(dir.mkdirs());
conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, dir.toURI().toString());
return conf;
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
Sun May 6 21:45:35 2012
@@ -20,9 +20,15 @@ package org.apache.hadoop.hdfs.server.jo
import static org.junit.Assert.*;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,20 +39,17 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
-import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
public class TestJournalHttpServer {
public static final Log LOG = LogFactory.getLog(TestJournalHttpServer.class);
@@ -85,7 +88,7 @@ public class TestJournalHttpServer {
}
/**
- * Test Journal service Http Server
+ * Test Journal service Http Server by verifying the html page is accessible
*
* @throws Exception
*/
@@ -116,79 +119,113 @@ public class TestJournalHttpServer {
cluster.shutdown();
}
}
+
+ /**
+ * Test hasCompleteJournalSegments with different log list combinations
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHasCompleteJournalSegments() throws Exception {
+ List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
+ logs.add(new RemoteEditLog(3,6));
+ logs.add(new RemoteEditLog(7,10));
+ logs.add(new RemoteEditLog(11,12));
+
+ assertTrue(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 13, 19));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 11, 19));
+
+ logs.remove(1);
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+ assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 19));
+ }
+
+ private void copyNNFiles(MiniDFSCluster cluster, File dstDir)
+ throws IOException {
+ Collection<URI> editURIs = cluster.getNameEditsDirs(0);
+ String firstURI = editURIs.iterator().next().getPath().toString();
+ File nnDir = new File(new String(firstURI + "/current"));
+
+ File allFiles[] = FileUtil.listFiles(nnDir);
+ for (File f : allFiles) {
+ IOUtils.copyBytes(new FileInputStream(f),
+ new FileOutputStream(dstDir + "/" + f.getName()), 4096, true);
+ }
+ }
/**
* Test lagging Journal service copies edit segments from another Journal
- * service
+ * service:
+ * 1. start one journal service
+ * 2. reboot namenode so more segments are created
+ * 3. add another journal service and this new journal service should sync
+ * with the first journal service
*
* @throws Exception
*/
@Test
public void testCopyEdits() throws Exception {
MiniDFSCluster cluster = null;
- JournalService service = null;
- JournalHttpServer jhs1 = null, jhs2 = null;
+ JournalService service1 = null, service2 = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
- // restart namenode, so it will have finalized edit segments
- cluster.restartNameNode();
-
+ // start journal service
conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
- InetSocketAddress serverAddr = new InetSocketAddress(50900);
- JournalListener listener = Mockito.mock(JournalListener.class);
- service = new JournalService(conf, nnAddr, serverAddr, listener);
- service.start();
-
+ InetSocketAddress serverAddr = NetUtils
+ .createSocketAddr("localhost:50900");
+ Journal j1 = new Journal(conf);
+ JournalListener listener1 = new JournalDiskWriter(j1);
+ service1 = new JournalService(conf, nnAddr, serverAddr,
+ new InetSocketAddress(50901), listener1, j1);
+ service1.start();
+
// get namenode clusterID/layoutVersion/namespaceID
- StorageInfo si = service.getJournal().getStorage();
+ StorageInfo si = service1.getJournal().getStorage();
JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
si.namespaceID);
- // start jns1 with path1
- jhs1 = new JournalHttpServer(conf, service.getJournal(),
- NetUtils.createSocketAddr("localhost:50200"));
- jhs1.start();
+ // restart namenode, so there will be one more journal segments
+ cluster.restartNameNode();
- // get all edit segments
- InetSocketAddress srcRpcAddr = NameNode.getServiceAddress(conf, true);
- NamenodeProtocol namenode = NameNodeProxies.createNonHAProxy(conf,
- srcRpcAddr, NamenodeProtocol.class,
- UserGroupInformation.getCurrentUser(), true).getProxy();
-
- RemoteEditLogManifest manifest = namenode.getEditLogManifest(1);
- jhs1.downloadEditFiles(
- conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY), manifest);
+ // TODO: remove file copy when NN can work with journal auto-machine
+ copyNNFiles(cluster, new File(new String(path1.toString() +
"/current")));
- // start jns2 with path2
+ // start another journal service that will do the sync
+ conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
- Journal journal2 = new Journal(conf);
- journal2.format(si.namespaceID, si.clusterID);
- jhs2 = new JournalHttpServer(conf, journal2,
- NetUtils.createSocketAddr("localhost:50300"));
- jhs2.start();
-
- // transfer edit logs from j1 to j2
- JournalSyncProtocol journalp =
JournalService.createProxyWithJournalSyncProtocol(
- NetUtils.createSocketAddr("localhost:50900"), conf,
- UserGroupInformation.getCurrentUser());
- RemoteEditLogManifest manifest1 =
journalp.getEditLogManifest(journalInfo, 1);
- jhs2.downloadEditFiles("localhost:50200", manifest1);
+ conf.set(DFSConfigKeys.DFS_JOURNAL_HTTP_ADDRESS_KEY,
+ "localhost:50902, localhost:50901");
+ Journal j2 = new Journal(conf);
+ JournalListener listener2 = new JournalDiskWriter(j2);
+ service2 = new JournalService(conf, nnAddr, new InetSocketAddress(50800),
+ NetUtils.createSocketAddr("localhost:50902"), listener2, j2);
+ service2.start();
+
+ // give service2 sometime to sync
+ Thread.sleep(5000);
+
+ // TODO: change to sinceTxid to 1 after NN is modified to use journal
+ // service to start
+ RemoteEditLogManifest manifest2 = service2.getEditLogManifest(
+ journalInfo, 3);
+ assertTrue(manifest2.getLogs().size() > 0);
} catch (IOException e) {
LOG.error("Error in TestCopyEdits:", e);
assertTrue(e.getLocalizedMessage(), false);
} finally {
- if (jhs1 != null)
- jhs1.stop();
- if (jhs2 != null)
- jhs2.stop();
if (cluster != null)
cluster.shutdown();
- if (service != null)
- service.stop();
+ if (service1 != null)
+ service1.stop();
+ if (service2 != null)
+ service2.stop();
}
}
}
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
Sun May 6 21:45:35 2012
@@ -52,6 +52,7 @@ public class TestJournalService {
@Test
public void testCallBacks() throws Exception {
Configuration conf = TestJournal.newConf("testCallBacks");
+ Journal journal = new Journal(conf);
JournalListener listener = Mockito.mock(JournalListener.class);
JournalService service = null;
MiniDFSCluster cluster = null;
@@ -59,7 +60,7 @@ public class TestJournalService {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(0);
InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
- service = newJournalService(nnAddr, listener, conf);
+ service = newJournalService(nnAddr, listener, journal, conf);
service.start();
verifyRollLogsCallback(service, listener);
verifyJournalCallback(cluster.getFileSystem(), service, listener);
@@ -73,6 +74,10 @@ public class TestJournalService {
}
}
+ /**
+ * Test journal service fence with a combination of epoch, nsid and clusterid
+ * @throws Exception
+ */
@Test
public void testFence() throws Exception {
final Configuration conf = TestJournal.newConf("testFence");
@@ -86,7 +91,8 @@ public class TestJournalService {
cluster.waitActive(0);
NameNode nn = cluster.getNameNode(0);
nnAddress = nn.getNameNodeAddress();
- service = newJournalService(nnAddress, listener, conf);
+ Journal j1 = new Journal(conf);
+ service = newJournalService(nnAddress, listener, j1, conf);
service.start();
String cid = nn.getNamesystem().getClusterId();
int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
@@ -101,17 +107,29 @@ public class TestJournalService {
StorageInfo before = service.getJournal().getStorage();
LOG.info("before: " + before);
service.stop();
- service = newJournalService(nnAddress, listener, conf);
+ Journal j2 = new Journal(conf);
+ service = newJournalService(nnAddress, listener, j2, conf);
StorageInfo after = service.getJournal().getStorage();
LOG.info("after : " + after);
Assert.assertEquals(before.toString(), after.toString());
}
+ /**
+ * Create additional journal service
+ * @param nnAddr namenode rpc address
+ * @param listener the listener serving journal requests from namenode
+ * @param journal local journal
+ * @param conf local confiuration
+ * @return journal service
+ * @throws Exception
+ */
private JournalService newJournalService(InetSocketAddress nnAddr,
- JournalListener listener, Configuration conf) throws IOException {
- return new JournalService(conf, nnAddr, RPC_ADDR, listener);
+ JournalListener listener, Journal journal, Configuration conf)
+ throws Exception {
+ return new JournalService(conf, nnAddr, RPC_ADDR, new InetSocketAddress(0),
+ listener, journal);
}
-
+
/**
* Starting {@link JournalService} should result in Namenode calling
* {@link JournalService#startLogSegment}, resulting in callback
@@ -135,6 +153,16 @@ public class TestJournalService {
Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
}
+ /**
+ * Verify the fence with different epoch, clusterid and nsid combinations
+ *
+ * @param s The Journal Service to write to
+ * @param listener the listener to serve journal request
+ * @param cid cluster id
+ * @param nsId namespace id
+ * @param lv layoutVersion
+ * @throws Exception
+ */
void verifyFence(JournalService s, JournalListener listener,
String cid, int nsId, int lv) throws Exception {