Author: todd
Date: Thu May 17 17:49:48 2012
New Revision: 1339745
URL: http://svn.apache.org/viewvc?rev=1339745&view=rev
Log:
HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
reliable. Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu May 17
17:49:48 2012
@@ -217,6 +217,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3434. InvalidProtocolBufferException when visiting DN
browseDirectory.jsp (eli)
+ HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
+ reliable. (todd)
+
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
Thu May 17 17:49:48 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -89,9 +90,6 @@ public class FSImage implements Closeabl
private final NNStorageRetentionManager archivalManager;
- private SaveNamespaceContext curSaveNamespaceContext = null;
-
-
/**
* Construct an FSImage
* @param conf Configuration
@@ -804,17 +802,28 @@ public class FSImage implements Closeabl
try {
thread.join();
} catch (InterruptedException iex) {
- LOG.error("Caught exception while waiting for thread " +
+ LOG.error("Caught interrupted exception while waiting for thread " +
thread.getName() + " to finish. Retrying join");
}
}
}
}
+
+ /**
+ * @see #saveNamespace(FSNamesystem, Canceler)
+ */
+ public synchronized void saveNamespace(FSNamesystem source)
+ throws IOException {
+ saveNamespace(source, null);
+ }
+
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
+ * @param canceler
*/
- public synchronized void saveNamespace(FSNamesystem source) throws
IOException {
+ public synchronized void saveNamespace(FSNamesystem source,
+ Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@@ -825,7 +834,7 @@ public class FSImage implements Closeabl
}
long imageTxId = getLastAppliedOrWrittenTxId();
try {
- saveFSImageInAllDirs(source, imageTxId);
+ saveFSImageInAllDirs(source, imageTxId, canceler);
storage.writeAll();
} finally {
if (editLogWasOpen) {
@@ -837,27 +846,27 @@ public class FSImage implements Closeabl
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
}
-
- }
-
- public void cancelSaveNamespace(String reason)
- throws InterruptedException {
- SaveNamespaceContext ctx = curSaveNamespaceContext;
- if (ctx != null) {
- ctx.cancel(reason); // waits until complete
- }
}
-
+ /**
+ * @see #saveFSImageInAllDirs(FSNamesystem, long, Canceler)
+ */
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long
txid)
+ throws IOException {
+ saveFSImageInAllDirs(source, txid, null);
+ }
+
+ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long
txid,
+ Canceler canceler)
throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
-
+ if (canceler == null) {
+ canceler = new Canceler();
+ }
SaveNamespaceContext ctx = new SaveNamespaceContext(
- source, txid);
- curSaveNamespaceContext = ctx;
+ source, txid, canceler);
try {
List<Thread> saveThreads = new ArrayList<Thread>();
@@ -878,7 +887,7 @@ public class FSImage implements Closeabl
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
- if (ctx.isCancelled()) {
+ if (canceler.isCancelled()) {
deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws
assert false : "should have thrown above!";
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
Thu May 17 17:49:48 2012
@@ -540,7 +540,6 @@ class FSImageFormat {
private void saveImage(ByteBuffer currentDirName,
INodeDirectory current,
DataOutputStream out) throws IOException {
- context.checkCancelled();
List<INode> children = current.getChildrenRaw();
if (children == null || children.isEmpty())
return;
@@ -554,9 +553,13 @@ class FSImageFormat {
out.write(currentDirName.array(), 0, prefixLen);
}
out.writeInt(children.size());
+ int i = 0;
for(INode child : children) {
// print all children first
FSImageSerialization.saveINode2Image(child, out);
+ if (i++ % 50 == 0) {
+ context.checkCancelled();
+ }
}
for(INode child : children) {
if(!child.isDirectory())
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Thu May 17 17:49:48 2012
@@ -702,7 +702,8 @@ public class FSNamesystem implements Nam
*/
void prepareToStopStandbyServices() throws ServiceFailedException {
if (standbyCheckpointer != null) {
- standbyCheckpointer.cancelAndPreventCheckpoints();
+ standbyCheckpointer.cancelAndPreventCheckpoints(
+ "About to leave standby state");
}
}
@@ -3373,27 +3374,6 @@ public class FSNamesystem implements Nam
}
/**
- * Cancel an ongoing saveNamespace operation and wait for its
- * threads to exit, if one is currently in progress.
- *
- * If no such operation is in progress, this call does nothing.
- *
- * @param reason a reason to be communicated to the caller saveNamespace
- * @throws IOException
- */
- void cancelSaveNamespace(String reason) throws IOException {
- readLock();
- try {
- checkSuperuserPrivilege();
- getFSImage().cancelSaveNamespace(reason);
- } catch (InterruptedException e) {
- throw new IOException(e);
- } finally {
- readUnlock();
- }
- }
-
- /**
* Enables/Disables/Checks restoring failed storage replicas if the storage
becomes available again.
* Requires superuser privilege.
*
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
Thu May 17 17:49:48 2012
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.util.Canceler;
import com.google.common.base.Preconditions;
@@ -36,20 +37,17 @@ class SaveNamespaceContext {
private final long txid;
private final List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
-
- /**
- * If the operation has been canceled, set to the reason why
- * it has been canceled (eg standby moving to active)
- */
- private volatile String cancelReason = null;
+ private final Canceler canceller;
private CountDownLatch completionLatch = new CountDownLatch(1);
-
+
SaveNamespaceContext(
FSNamesystem sourceNamesystem,
- long txid) {
+ long txid,
+ Canceler canceller) {
this.sourceNamesystem = sourceNamesystem;
this.txid = txid;
+ this.canceller = canceller;
}
FSNamesystem getSourceNamesystem() {
@@ -68,17 +66,6 @@ class SaveNamespaceContext {
return errorSDs;
}
- /**
- * Requests that the current saveNamespace operation be
- * canceled if it is still running.
- * @param reason the reason why cancellation is requested
- * @throws InterruptedException
- */
- void cancel(String reason) throws InterruptedException {
- this.cancelReason = reason;
- completionLatch.await();
- }
-
void markComplete() {
Preconditions.checkState(completionLatch.getCount() == 1,
"Context already completed!");
@@ -86,13 +73,9 @@ class SaveNamespaceContext {
}
void checkCancelled() throws SaveNamespaceCancelledException {
- if (cancelReason != null) {
+ if (canceller.isCancelled()) {
throw new SaveNamespaceCancelledException(
- cancelReason);
+ canceller.getCancellationReason());
}
}
-
- boolean isCancelled() {
- return cancelReason != null;
- }
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
Thu May 17 17:49:48 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +59,9 @@ public class StandbyCheckpointer {
private final CheckpointerThread thread;
private String activeNNAddress;
private InetSocketAddress myNNAddress;
+
+ private Object cancelLock = new Object();
+ private Canceler canceler;
// Keep track of how many checkpoints were canceled.
// This is for use in tests.
@@ -123,6 +127,7 @@ public class StandbyCheckpointer {
}
public void stop() throws IOException {
+ cancelAndPreventCheckpoints("Stopping checkpointer");
thread.setShouldRun(false);
thread.interrupt();
try {
@@ -134,6 +139,7 @@ public class StandbyCheckpointer {
}
private void doCheckpoint() throws InterruptedException, IOException {
+ assert canceler != null;
long txid;
namesystem.writeLockInterruptibly();
@@ -153,8 +159,8 @@ public class StandbyCheckpointer {
thisCheckpointTxId + ". Skipping...");
return;
}
-
- img.saveNamespace(namesystem);
+
+ img.saveNamespace(namesystem, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at
txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;
@@ -173,16 +179,18 @@ public class StandbyCheckpointer {
* and prevent any new checkpoints from starting for the next
* minute or so.
*/
- public void cancelAndPreventCheckpoints() throws ServiceFailedException {
- try {
- thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
- // TODO(HA): there is a really narrow race here if we are just
- // about to start a checkpoint - this won't cancel it!
- namesystem.getFSImage().cancelSaveNamespace(
- "About to exit standby state");
- } catch (InterruptedException e) {
- throw new ServiceFailedException(
- "Interrupted while trying to cancel checkpoint");
+ public void cancelAndPreventCheckpoints(String msg) throws
ServiceFailedException {
+ thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
+ synchronized (cancelLock) {
+ // Before beginning a checkpoint, the checkpointer thread
+ // takes this lock, and creates a canceler object.
+ // If the canceler is non-null, then a checkpoint is in
+ // progress and we need to cancel it. If it's null, then
+ // the operation has not started, meaning that the above
+ // time-based prevention will take effect.
+ if (canceler != null) {
+ canceler.cancel(msg);
+ }
}
}
@@ -272,10 +280,18 @@ public class StandbyCheckpointer {
"exceeds the configured interval " +
checkpointConf.getPeriod());
needCheckpoint = true;
}
- if (needCheckpoint && now < preventCheckpointsUntil) {
- LOG.info("But skipping this checkpoint since we are about to
failover!");
- canceledCount++;
- } else if (needCheckpoint) {
+
+ synchronized (cancelLock) {
+ if (now < preventCheckpointsUntil) {
+ LOG.info("But skipping this checkpoint since we are about to
failover!");
+ canceledCount++;
+ continue;
+ }
+ assert canceler == null;
+ canceler = new Canceler();
+ }
+
+ if (needCheckpoint) {
doCheckpoint();
lastCheckpointTime = now;
}
@@ -287,6 +303,10 @@ public class StandbyCheckpointer {
continue;
} catch (Throwable t) {
LOG.error("Exception in doCheckpoint", t);
+ } finally {
+ synchronized (cancelLock) {
+ canceler = null;
+ }
}
}
}
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java?rev=1339745&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Canceler.java
Thu May 17 17:49:48 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Provides a simple interface where one thread can mark an operation
+ * for cancellation, and another thread can poll for whether the
+ * cancellation has occurred.
+ */
[email protected]
+public class Canceler {
+ /**
+ * If the operation has been canceled, set to the reason why
+ * it has been canceled (eg standby moving to active)
+ */
+ private volatile String cancelReason = null;
+
+ /**
+ * Requests that the current operation be canceled if it is still running.
+ * This does not block until the cancellation is successful.
+ * @param reason the reason why cancellation is requested
+ */
+ public void cancel(String reason) {
+ this.cancelReason = reason;
+ }
+
+ public boolean isCancelled() {
+ return cancelReason != null;
+ }
+
+ public String getCancellationReason() {
+ return cancelReason;
+ }
+}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
Thu May 17 17:49:48 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -517,14 +518,15 @@ public class TestSaveNamespace {
try {
doAnEdit(fsn, 1);
-
+ final Canceler canceler = new Canceler();
+
// Save namespace
fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
try {
Future<Void> saverFuture = pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
- image.saveNamespace(finalFsn);
+ image.saveNamespace(finalFsn, canceler);
return null;
}
});
@@ -534,7 +536,7 @@ public class TestSaveNamespace {
// then cancel the saveNamespace
Future<Void> cancelFuture = pool.submit(new Callable<Void>() {
public Void call() throws Exception {
- image.cancelSaveNamespace("cancelled");
+ canceler.cancel("cancelled");
return null;
}
});
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java?rev=1339745&r1=1339744&r2=1339745&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
Thu May 17 17:49:48 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.URI;
import java.util.List;
@@ -36,6 +37,11 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -52,12 +58,18 @@ public class TestStandbyCheckpoints {
private NameNode nn0, nn1;
private FileSystem fs;
+ @SuppressWarnings("rawtypes")
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
+ conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+ SlowCodec.class.getCanonicalName());
+ CompressionCodecFactory.setCodecClasses(conf,
+ ImmutableList.<Class>of(SlowCodec.class));
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
@@ -159,14 +171,15 @@ public class TestStandbyCheckpoints {
// We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1))
- .saveNamespace((FSNamesystem) Mockito.anyObject());
+ .saveNamespace((FSNamesystem) Mockito.anyObject(),
+ (Canceler)Mockito.anyObject());
}
/**
* Test cancellation of ongoing checkpoints when failover happens
* mid-checkpoint.
*/
- @Test
+ @Test(timeout=120000)
public void testCheckpointCancellation() throws Exception {
cluster.transitionToStandby(0);
@@ -191,16 +204,18 @@ public class TestStandbyCheckpoints {
cluster.transitionToActive(0);
- for (int i = 0; i < 10; i++) {
+ boolean canceledOne = false;
+ for (int i = 0; i < 10 && !canceledOne; i++) {
doEdits(i*10, i*10 + 10);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
+ canceledOne = StandbyCheckpointer.getCanceledCount() > 0;
}
- assertTrue(StandbyCheckpointer.getCanceledCount() > 0);
+ assertTrue(canceledOne);
}
private void doEdits(int start, int stop) throws IOException {
@@ -209,5 +224,22 @@ public class TestStandbyCheckpoints {
fs.mkdirs(p);
}
}
+
+ /**
+ * A codec which just slows down the saving of the image significantly
+ * by sleeping a few milliseconds on every write. This makes it easy to
+ * catch the standby in the middle of saving a checkpoint.
+ */
+ public static class SlowCodec extends GzipCodec {
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ CompressionOutputStream ret = super.createOutputStream(out);
+ CompressionOutputStream spy = Mockito.spy(ret);
+ Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2))
+ .when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(),
Mockito.anyInt());
+ return spy;
+ }
+ }
}