Author: szetszwo
Date: Tue Apr 17 22:57:53 2012
New Revision: 1327314
URL: http://svn.apache.org/viewvc?rev=1327314&view=rev
Log:
HDFS-3196. Add Journal and JournalDiskWriter for journal service.
Added:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
Tue Apr 17 22:57:53 2012
@@ -17,6 +17,9 @@ HDFS-3092 branch changes
HDFS-3283. Add http server to journal service. (Brandon Li via szetszwo)
+ HDFS-3196. Add Journal and JournalDiskWriter for journal service.
+ (szetszwo)
+
IMPROVEMENTS
OPTIMIZATIONS
Added:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java?rev=1327314&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
(added)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
Tue Apr 17 22:57:53 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/** The journal stored in local directories. */
+class Journal {
+ static final Log LOG = LogFactory.getLog(Journal.class);
+
+ private final FSImage image;
+ private volatile boolean isFormatted;
+
+ /**
+ * Constructor. It is possible that the directory is not formatted.
+ */
+ Journal(Configuration conf) throws IOException {
+ this.image = new FSImage(conf, Collections.<URI>emptyList(),
getEditDirs(conf));
+
+ final Map<StorageDirectory, StorageState> states
+ = new HashMap<StorageDirectory, StorageState>();
+ isFormatted = image.recoverStorageDirs(StartupOption.REGULAR, states);
+ for(Map.Entry<StorageDirectory, StorageState> e : states.entrySet()) {
+ LOG.info(e.getKey() + ": " + e.getValue());
+ }
+ LOG.info("The journal is " + (isFormatted? "already": "not yet")
+ + " formatted: " + image.getStorage());
+ }
+
+ /**
+ * Format the local edit directory.
+ */
+ synchronized void format(int namespaceId, String clusterId) throws
IOException {
+ if (isFormatted) {
+ throw new IllegalStateException("The joural is already formatted.");
+ }
+ final NNStorage s = image.getStorage();
+ s.format(new NamespaceInfo(namespaceId, clusterId, "dummy-bpid", 0, 0));
+ isFormatted = true;
+ LOG.info("Formatted journal: " + s);
+ }
+
+ boolean isFormatted() {
+ return isFormatted;
+ }
+
+ StorageInfo getStorageInfo() {
+ return image.getStorage();
+ }
+
+ synchronized void verifyVersion(JournalService service, NamespaceInfo info
+ ) throws IOException {
+ if (!isFormatted) {
+ return;
+ }
+
+ final StorageInfo stored = image.getStorage();
+ if (!stored.getClusterID().equals(info.getClusterID())) {
+ throw new IOException("Cluster IDs not matched: stored = "
+ + stored.getClusterID() + " != passed = " + info.getClusterID());
+ }
+ if (stored.getNamespaceID() != info.getNamespaceID()) {
+ throw new IOException("Namespace IDs not matched: stored = "
+ + stored.getNamespaceID() + " != passed = " + info.getNamespaceID());
+ }
+ if (stored.getLayoutVersion() != info.getLayoutVersion()) {
+ throw new IOException("Layout versions not matched: stored = "
+ + stored.getLayoutVersion() + " != passed = " +
info.getLayoutVersion());
+ }
+ if (stored.getCTime() != info.getCTime()) {
+ throw new IOException("CTimes not matched: stored = "
+ + stored.getCTime() + " != passed = " + info.getCTime());
+ }
+ }
+
+ void close() throws IOException {
+ image.close();
+ }
+
+ FSEditLog getEditLog() {
+ return image.getEditLog();
+ }
+
+ static List<URI> getEditDirs(Configuration conf) throws IOException {
+ final Collection<String> dirs = conf.getTrimmedStringCollection(
+ DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY);
+ LOG.info(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY + " = " + dirs);
+ return Util.stringCollectionAsURIs(dirs);
+ }
+}
\ No newline at end of file
Added:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java?rev=1327314&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
(added)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
Tue Apr 17 22:57:53 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/** A JournalListener for writing journal to an edit log. */
+class JournalDiskWriter implements JournalListener {
+ static final Log LOG = LogFactory.getLog(JournalDiskWriter.class);
+
+ private final Journal journal;
+
+ /**
+ * Constructor. It is possible that the directory is not formatted. In this
+ * case, it creates dummy entries and storage is later formatted.
+ */
+ JournalDiskWriter(Journal journal) throws IOException {
+ this.journal = journal;
+ }
+
+ Journal getJournal() {
+ return journal;
+ }
+
+ @Override
+ public synchronized void verifyVersion(JournalService service,
+ NamespaceInfo info) throws IOException {
+ journal.verifyVersion(service, info);
+ }
+
+ @Override
+ public synchronized void journal(JournalService service, long firstTxId,
+ int numTxns, byte[] records) throws IOException {
+ journal.getEditLog().journal(firstTxId, numTxns, records);
+ }
+
+ @Override
+ public synchronized void startLogSegment(JournalService service, long txid
+ ) throws IOException {
+ journal.getEditLog().startLogSegment(txid, false);
+ }
+}
\ No newline at end of file
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
Tue Apr 17 22:57:53 2012
@@ -35,8 +35,10 @@ public interface JournalListener {
*
* The application using {@link JournalService} can stop the service if
* {@code info} validation fails.
+ * @throws IOException
*/
- public void verifyVersion(JournalService service, NamespaceInfo info);
+ public void verifyVersion(JournalService service, NamespaceInfo info
+ ) throws IOException;
/**
* Process the received Journal record
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
Tue Apr 17 22:57:53 2012
@@ -17,39 +17,18 @@
*/
package org.apache.hadoop.hdfs.server.journalservice;
-import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
-
-import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import
org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
@@ -65,8 +44,6 @@ import org.apache.hadoop.security.UserGr
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY;
-
/**
* This class interfaces with the namenode using {@link JournalProtocol} over
* RPC. It has two modes: <br>
@@ -85,7 +62,6 @@ import static org.apache.hadoop.hdfs.DFS
public class JournalService implements JournalProtocol {
public static final Log LOG =
LogFactory.getLog(JournalService.class.getName());
- private final JournalListener listener;
private final InetSocketAddress nnAddress;
private NamenodeRegistration registration;
private final NamenodeProtocol namenode;
@@ -93,11 +69,10 @@ public class JournalService implements J
private final RPC.Server rpcServer;
private long epoch = 0;
private String fencerInfo;
- private StorageInfo storageInfo;
- private Configuration conf;
- private FSEditLog editLog;
- private FSImage image;
-
+
+ private final Journal journal;
+ private final JournalListener listener;
+
enum State {
/** The service is initialized and ready to start. */
INIT(false, false),
@@ -193,66 +168,30 @@ public class JournalService implements J
throws IOException {
this.nnAddress = nnAddr;
this.listener = listener;
+ this.journal = new Journal(conf);
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
.getProxy();
this.rpcServer = createRpcServer(conf, serverAddress, this);
- this.conf = conf;
-
- try {
- initializeJournalStorage(conf);
- } catch (IOException ioe) {
- LOG.info("Exception in initialize: " + ioe.getMessage());
- throw ioe;
- }
}
- /** This routine initializes the storage directory. It is possible that
- * the directory is not formatted. In this case, it creates dummy entries
- * and storage is later formatted.
- */
- private void initializeJournalStorage(Configuration conf) throws IOException
{
-
- boolean isFormatted = false;
- Collection<URI> dirsToFormat = new ArrayList<URI>();
- List<URI> editUrisToFormat = getJournalEditsDirs(conf);
-
- // Load the newly formatted image, using all of the directories
- image = new FSImage(conf, dirsToFormat, editUrisToFormat);
- Map<StorageDirectory, StorageState> dataDirStates =
- new HashMap<StorageDirectory, StorageState>();
- isFormatted = image
- .recoverStorageDirs(StartupOption.REGULAR, dataDirStates);
-
- if (isFormatted == true) {
- // Directory has been formatted. So, it should have a versionfile.
- this.editLog = image.getEditLog();
- Iterator<StorageDirectory> sdit = image.getStorage().dirIterator(
- NNStorage.NameNodeDirType.IMAGE);
- StorageDirectory sd = sdit.next();
-
- Properties props = Storage.readPropertiesFile(sd.getVersionFile());
- String cid = props.getProperty("clusterID");
- String nsid = props.getProperty("namespaceID");
- String layout = props.getProperty("layoutVersion");
- storageInfo = new StorageInfo(Integer.parseInt(layout),
- Integer.parseInt(nsid), cid, 0);
-
- LOG.info("JournalService constructor, nsid "
- + storageInfo.getNamespaceID() + " cluster id "
- + storageInfo.getClusterID());
-
- String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
- registration = new NamenodeRegistration(addr, "", storageInfo,
- NamenodeRole.BACKUP);
- } else {
- // Storage directory has not been formatted. So create dummy entries for
now.
- image = new FSImage(conf);
- storageInfo = image.getStorage();
- editLog = null;
+ Journal getJournal() {
+ return journal;
+ }
+
+ synchronized NamenodeRegistration getRegistration() {
+ if (!journal.isFormatted()) {
+ throw new IllegalStateException("Journal is not formatted.");
+ }
+
+ if (registration == null) {
+ registration = new NamenodeRegistration(
+ NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
+ journal.getStorageInfo(), NamenodeRole.BACKUP);
}
+ return registration;
}
-
+
/**
* Start the service.
*/
@@ -304,9 +243,10 @@ public class JournalService implements J
* Stop the service. For application with RPC Server managed outside, the
* RPC Server must be stopped the application.
*/
- public void stop() {
+ public void stop() throws IOException {
if (!stateHandler.isStopped()) {
rpcServer.stop();
+ journal.close();
}
}
@@ -332,47 +272,15 @@ public class JournalService implements J
listener.startLogSegment(this, txid);
stateHandler.startLogSegment();
}
-
- private void setupStorage(JournalInfo jinfo) throws IOException {
- formatStorage(jinfo.getNamespaceId(), jinfo.getClusterId());
- }
-
- private void setupStorage(NamespaceInfo nsinfo) throws IOException {
- formatStorage(nsinfo.getNamespaceID(), nsinfo.getClusterID());
- }
-
- /**
- * Will format the edits dir and save the nsid + cluster id.
- * @param nsId
- * @param clusterId
- * @throws IOException
- */
- private void formatStorage(int nsId, String clusterId) throws IOException {
- Collection<URI> dirsToFormat = new ArrayList<URI>();
- List<URI> editUrisToFormat = getJournalEditsDirs(conf);
- NNStorage nnStorage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
- LOG.info("Setting up storage for nsid " + nsId + " clusterid " +
clusterId);
- nnStorage.format(new NamespaceInfo(nsId, clusterId, "journalservice", 0,
0));
- image = new FSImage(conf, dirsToFormat, editUrisToFormat);
- this.editLog = image.getEditLog();
-
- storageInfo = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(),
- nsId, clusterId, 0);
- registration = new NamenodeRegistration(
- NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
- storageInfo, NamenodeRole.BACKUP);
- }
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
- // This implies that this is the first fence on the journal service
- // It does not have any nsid or cluster id info.
- if ((storageInfo.getClusterID() == null)
- || (storageInfo.getNamespaceID() == 0)) {
- setupStorage(journalInfo);
+ // It is the first fence if the journal is not formatted,
+ if (!journal.isFormatted()) {
+ journal.format(journalInfo.getNamespaceId(), journalInfo.getClusterId());
}
verifyFence(epoch, fencerInfo);
verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
@@ -422,19 +330,18 @@ public class JournalService implements J
*/
private void verify(int nsid, String clusid) throws IOException {
String errorMsg = null;
- int expectedNamespaceID = registration.getNamespaceID();
+ final NamenodeRegistration reg = getRegistration();
- if (nsid != expectedNamespaceID) {
+ if (nsid != reg.getNamespaceID()) {
errorMsg = "Invalid namespaceID in journal request - expected "
- + expectedNamespaceID + " actual " + nsid;
+ + reg.getNamespaceID() + " actual " + nsid;
LOG.warn(errorMsg);
throw new UnregisteredNodeException(errorMsg);
}
if ((clusid == null)
- || (!clusid.equals(registration.getClusterID()))) {
+ || (!clusid.equals(reg.getClusterID()))) {
errorMsg = "Invalid clusterId in journal request - incoming "
- + clusid + " expected "
- + registration.getClusterID();
+ + clusid + " expected " + reg.getClusterID();
LOG.warn(errorMsg);
throw new UnregisteredNodeException(errorMsg);
}
@@ -452,7 +359,7 @@ public class JournalService implements J
* Register this service with the active namenode.
*/
private void registerWithNamenode() throws IOException {
- NamenodeRegistration nnReg = namenode.register(registration);
+ NamenodeRegistration nnReg = namenode.register(getRegistration());
String msg = null;
if(nnReg == null) { // consider as a rejection
msg = "Registration rejected by " + nnAddress;
@@ -472,9 +379,8 @@ public class JournalService implements J
// If this is the first initialization of journal service, then storage
// directory will be setup. Otherwise, nsid and clusterid has to match with
// the info saved in the edits dir.
- if ((storageInfo.getClusterID() == null)
- || (storageInfo.getNamespaceID() == 0)) {
- setupStorage(nsInfo);
+ if (!journal.isFormatted()) {
+ journal.format(nsInfo.getNamespaceID(), nsInfo.getClusterID());
} else {
verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
}
@@ -484,17 +390,4 @@ public class JournalService implements J
long getEpoch() {
return epoch;
}
-
- /**
- * Returns edit directories that are shared between primary and secondary.
- * @param conf
- * @return Collection of edit directories.
- */
- public List<URI> getJournalEditsDirs(Configuration conf) {
- // don't use getStorageDirs here, because we want an empty default
- // rather than the dir in /tmp
- Collection<String> dirNames = conf.getTrimmedStringCollection(
- DFS_JOURNAL_EDITS_DIR_KEY);
- return Util.stringCollectionAsURIs(dirNames);
- }
}
Added:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java?rev=1327314&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
(added)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
Tue Apr 17 22:57:53 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJournal {
+ static final Log LOG = LogFactory.getLog(TestJournal.class);
+
+ static Configuration newConf(String name) {
+ Configuration conf = new HdfsConfiguration();
+ File dir = new File(MiniDFSCluster.getBaseDirectory(), name + "-edits");
+ Assert.assertTrue(dir.mkdirs());
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, dir.toURI().toString());
+ return conf;
+ }
+
+ @Test
+ public void testFormat() throws Exception {
+ final Configuration conf = newConf("testFormat");
+ final Journal j = new Journal(conf);
+ LOG.info("Initial : " + j.getStorageInfo());
+ Assert.assertFalse(j.isFormatted());
+
+ //format
+ final int namespaceId = 123;
+ final String clusterId = "my-cluster-id";
+ j.format(namespaceId, clusterId);
+ Assert.assertTrue(j.isFormatted());
+
+ final StorageInfo info = j.getStorageInfo();
+ LOG.info("Formatted: " + info);
+
+ Assert.assertEquals(namespaceId, info.getNamespaceID());
+ Assert.assertEquals(clusterId, info.getClusterID());
+ j.close();
+
+ //create another Journal object
+ final StorageInfo another = new Journal(conf).getStorageInfo();
+ Assert.assertEquals(info.toString(), another.toString());
+ }
+}
\ No newline at end of file
Modified:
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
(original)
+++
hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
Tue Apr 17 22:57:53 2012
@@ -17,12 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.journalservice;
-import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URI;
-
-import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,15 +26,14 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -46,10 +41,9 @@ import org.mockito.Mockito;
* Tests for {@link JournalService}
*/
public class TestJournalService {
- private MiniDFSCluster cluster;
- private Configuration conf = new HdfsConfiguration();
static final Log LOG = LogFactory.getLog(TestJournalService.class);
-
+ static final InetSocketAddress RPC_ADDR = new InetSocketAddress(0);
+
/**
* Test calls backs {@link JournalListener#startLogSegment(JournalService,
long)} and
* {@link JournalListener#journal(JournalService, long, int, byte[])} are
@@ -57,47 +51,65 @@ public class TestJournalService {
*/
@Test
public void testCallBacks() throws Exception {
+ Configuration conf = TestJournal.newConf("testCallBacks");
JournalListener listener = Mockito.mock(JournalListener.class);
JournalService service = null;
+ MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(0);
- FileSystem fs = FileSystem.getLocal(conf);
- File journalEditsDir = cluster.getJournalEditsDir();
- boolean result = fs.mkdirs(new Path(journalEditsDir.toString()));
- conf.set(DFS_JOURNAL_EDITS_DIR_KEY, journalEditsDir.toString());
- service = startJournalService(listener);
+ InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+ service = newJournalService(nnAddr, listener, conf);
+ service.start();
verifyRollLogsCallback(service, listener);
- verifyJournalCallback(service, listener);
- verifyFence(service, listener, cluster.getNameNode(0));
+ verifyJournalCallback(cluster.getFileSystem(), service, listener);
} finally {
- if (service != null) {
- stopJournalService(service);
- }
if (cluster != null) {
cluster.shutdown();
}
+ if (service != null) {
+ service.stop();
+ }
}
}
-
- private JournalService restartJournalService(JournalService service,
- JournalListener listener) throws IOException {
- stopJournalService(service);
- return (startJournalService(listener));
- }
- private JournalService startJournalService(JournalListener listener)
- throws IOException {
- InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
- InetSocketAddress serverAddr = new InetSocketAddress(0);
- JournalService service = new JournalService(conf, nnAddr, serverAddr,
- listener);
- service.start();
- return service;
- }
-
- private void stopJournalService(JournalService service) throws IOException {
+ @Test
+ public void testFence() throws Exception {
+ final Configuration conf = TestJournal.newConf("testFence");
+ final JournalListener listener = Mockito.mock(JournalListener.class);
+ final InetSocketAddress nnAddress;
+
+ JournalService service = null;
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive(0);
+ NameNode nn = cluster.getNameNode(0);
+ nnAddress = nn.getNameNodeAddress();
+ service = newJournalService(nnAddress, listener, conf);
+ service.start();
+ String cid = nn.getNamesystem().getClusterId();
+ int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
+ int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+
+ verifyFence(service, listener, cid, nsId, lv);
+ } finally {
+ cluster.shutdown();
+ }
+
+ //test restart journal service
+ StorageInfo before = service.getJournal().getStorageInfo();
+ LOG.info("before: " + before);
service.stop();
+ service = newJournalService(nnAddress, listener, conf);
+ StorageInfo after = service.getJournal().getStorageInfo();
+ LOG.info("after : " + after);
+ Assert.assertEquals(before.toString(), after.toString());
+ }
+
+ private JournalService newJournalService(InetSocketAddress nnAddr,
+ JournalListener listener, Configuration conf) throws IOException {
+ return new JournalService(conf, nnAddr, RPC_ADDR, listener);
}
/**
@@ -114,19 +126,17 @@ public class TestJournalService {
* File system write operations should result in JournalListener call
* backs.
*/
- private void verifyJournalCallback(JournalService s, JournalListener l)
throws IOException {
+ private void verifyJournalCallback(FileSystem fs, JournalService s,
+ JournalListener l) throws IOException {
Path fileName = new Path("/tmp/verifyJournalCallback");
- FileSystem fs = cluster.getFileSystem();
FileSystemTestHelper.createFile(fs, fileName);
fs.delete(fileName, true);
Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
}
- public void verifyFence(JournalService s, JournalListener listener, NameNode
nn) throws Exception {
- String cid = nn.getNamesystem().getClusterId();
- int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
- int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+ void verifyFence(JournalService s, JournalListener listener,
+ String cid, int nsId, int lv) throws Exception {
// Fence the journal service
JournalInfo info = new JournalInfo(lv, cid, nsId);
@@ -171,7 +181,6 @@ public class TestJournalService {
LOG.info(ignore.getMessage());
}
- s = restartJournalService(s, listener);
// New epoch higher than the current epoch is successful
resp = s.fence(info, currentEpoch+1, "fencer");
Assert.assertNotNull(resp);