This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 72b37e8  HDFS-12979. StandbyNode should upload FsImage to ObserverNode 
after checkpointing. Contributed by Chen Liang.
72b37e8 is described below

commit 72b37e8833e22ee72dc441b1fac49067faf7e267
Author: Chen Liang <cli...@apache.org>
AuthorDate: Thu Oct 3 12:23:25 2019 -0700

    HDFS-12979. StandbyNode should upload FsImage to ObserverNode after 
checkpointing. Contributed by Chen Liang.
---
 .../hadoop/hdfs/server/namenode/ImageServlet.java  |  63 ++++++++-
 .../hadoop/hdfs/server/namenode/NameNode.java      |   8 +-
 .../server/namenode/ha/StandbyCheckpointer.java    | 141 +++++++++++++--------
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |   8 ++
 .../hdfs/server/namenode/TestCheckpoint.java       |  55 ++++++++
 .../server/namenode/ha/TestStandbyCheckpoints.java |  37 +++++-
 6 files changed, 258 insertions(+), 54 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
index fdd8d70..ad8b159 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
@@ -17,7 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.hdfs.server.common.Util;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.net.HttpURLConnection;
@@ -88,6 +94,10 @@ public class ImageServlet extends HttpServlet {
   private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = 
Collections
       .<ImageUploadRequest> synchronizedSortedSet(new 
TreeSet<ImageUploadRequest>());
 
+  public static final String RECENT_IMAGE_CHECK_ENABLED =
+      "recent.image.check.enabled";
+  public static final boolean RECENT_IMAGE_CHECK_ENABLED_DEFAULT = true;
+
   @Override
   public void doGet(final HttpServletRequest request,
       final HttpServletResponse response) throws ServletException, IOException 
{
@@ -481,6 +491,23 @@ public class ImageServlet extends HttpServlet {
       final PutImageParams parsedParams = new PutImageParams(request, response,
           conf);
       final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+      final boolean checkRecentImageEnable;
+      Object checkRecentImageEnableObj =
+          context.getAttribute(RECENT_IMAGE_CHECK_ENABLED);
+      if (checkRecentImageEnableObj != null) {
+        if (checkRecentImageEnableObj instanceof Boolean) {
+          checkRecentImageEnable = (boolean) checkRecentImageEnableObj;
+        } else {
+          // This is an error case, but crashing NN due to this
+          // seems more undesirable. Only log the error and set to default.
+          LOG.error("Expecting boolean obj for setting checking recent image, "
+              + "but got " + checkRecentImageEnableObj.getClass() + ". This is 
"
+              + "unexpected! Setting to default.");
+          checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
+        }
+      } else {
+        checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
+      }
 
       validateRequest(context, conf, request, response, nnImage,
           parsedParams.getStorageInfoString());
@@ -494,7 +521,8 @@ public class ImageServlet extends HttpServlet {
               // target (regardless of the fact that we got the image)
               HAServiceProtocol.HAServiceState state = NameNodeHttpServer
                   .getNameNodeStateFromContext(getServletContext());
-              if (state != HAServiceProtocol.HAServiceState.ACTIVE) {
+              if (state != HAServiceProtocol.HAServiceState.ACTIVE &&
+                  state != HAServiceProtocol.HAServiceState.OBSERVER) {
                 // we need a different response type here so the client can 
differentiate this
                 // from the failure to upload due to (1) security, or (2) 
other checkpoints already
                 // present
@@ -528,6 +556,39 @@ public class ImageServlet extends HttpServlet {
                         + txid);
                 return null;
               }
+
+              long now = System.currentTimeMillis();
+              long lastCheckpointTime =
+                  nnImage.getStorage().getMostRecentCheckpointTime();
+              long lastCheckpointTxid =
+                  nnImage.getStorage().getMostRecentCheckpointTxId();
+
+              long checkpointPeriod =
+                  conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
+                      DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, 
TimeUnit.SECONDS);
+              long checkpointTxnCount =
+                  conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
+                      DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
+
+              long timeDelta = TimeUnit.MILLISECONDS.toSeconds(
+                  now - lastCheckpointTime);
+
+              if (checkRecentImageEnable &&
+                  timeDelta < checkpointPeriod &&
+                  txid - lastCheckpointTxid < checkpointTxnCount) {
+                // only when at least one of two conditions are met we accept
+                // a new fsImage
+                // 1. most recent image's txid is too far behind
+                // 2. last checkpoint time was too old
+                response.sendError(HttpServletResponse.SC_CONFLICT,
+                    "Most recent checkpoint is neither too far behind in "
+                        + "txid, nor too old. New txnid cnt is "
+                        + (txid - lastCheckpointTxid)
+                        + ", expecting at least " + checkpointTxnCount
+                        + " unless too long since last upload.");
+                return null;
+              }
+
               try {
                 if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
                   response.sendError(HttpServletResponse.SC_CONFLICT,
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index b44d104..ccc0af6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -66,6 +66,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.ipc.ExternalCall;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.ipc.RetriableException;
@@ -506,7 +507,12 @@ public class NameNode extends ReconfigurableBase implements
     LOG.info("Setting ADDRESS {}", address);
     conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
   }
-  
+
+  @VisibleForTesting
+  public HttpServer2 getHttpServer() {
+    return httpServer.getHttpServer();
+  }
+
   /**
    * Fetches the address for services to use when connecting to namenode
    * based on the value of fallback returns null if the special
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
index 753447b..c05a0da 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
@@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import static org.apache.hadoop.util.Time.monotonicNow;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -65,7 +68,6 @@ public class StandbyCheckpointer {
   private final Configuration conf;
   private final FSNamesystem namesystem;
   private long lastCheckpointTime;
-  private long lastUploadTime;
   private final CheckpointerThread thread;
   private final ThreadFactory uploadThreadFactory;
   private List<URL> activeNNAddresses;
@@ -73,12 +75,14 @@ public class StandbyCheckpointer {
 
   private final Object cancelLock = new Object();
   private Canceler canceler;
-  private boolean isPrimaryCheckPointer = true;
 
   // Keep track of how many checkpoints were canceled.
   // This is for use in tests.
   private static int canceledCount = 0;
-  
+
+  // A map from NN url to the most recent image upload time.
+  private final HashMap<String, CheckpointReceiverEntry> checkpointReceivers;
+
   public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
       throws IOException {
     this.namesystem = ns;
@@ -89,6 +93,37 @@ public class StandbyCheckpointer {
         .setNameFormat("TransferFsImageUpload-%d").build();
 
     setNameNodeAddresses(conf);
+    this.checkpointReceivers = new HashMap<>();
+    for (URL address : activeNNAddresses) {
+      this.checkpointReceivers.put(address.toString(),
+          new CheckpointReceiverEntry());
+    }
+  }
+
+  private static final class CheckpointReceiverEntry {
+    private long lastUploadTime;
+    private boolean isPrimary;
+
+    CheckpointReceiverEntry() {
+      this.lastUploadTime = 0L;
+      this.isPrimary = true;
+    }
+
+    void setLastUploadTime(long lastUploadTime) {
+      this.lastUploadTime = lastUploadTime;
+    }
+
+    void setIsPrimary(boolean isPrimaryFor) {
+      this.isPrimary = isPrimaryFor;
+    }
+
+    long getLastUploadTime() {
+      return lastUploadTime;
+    }
+
+    boolean isPrimary() {
+      return isPrimary;
+    }
   }
 
   /**
@@ -156,7 +191,7 @@ public class StandbyCheckpointer {
     thread.interrupt();
   }
 
-  private void doCheckpoint(boolean sendCheckpoint) throws 
InterruptedException, IOException {
+  private void doCheckpoint() throws InterruptedException, IOException {
     assert canceler != null;
     final long txid;
     final NameNodeFile imageType;
@@ -208,11 +243,6 @@ public class StandbyCheckpointer {
       namesystem.cpUnlock();
     }
 
-    //early exit if we shouldn't actually send the checkpoint to the ANN
-    if(!sendCheckpoint){
-      return;
-    }
-
     // Upload the saved checkpoint back to the active
     // Do this in a separate thread to avoid blocking transition to active, 
but don't allow more
     // than the expected number of tasks to run or queue up
@@ -222,54 +252,67 @@ public class StandbyCheckpointer {
         uploadThreadFactory);
     // for right now, just match the upload to the nn address by convention. 
There is no need to
     // directly tie them together by adding a pair class.
-    List<Future<TransferFsImage.TransferResult>> uploads =
-        new ArrayList<Future<TransferFsImage.TransferResult>>();
+    HashMap<String, Future<TransferFsImage.TransferResult>> uploads =
+        new HashMap<>();
     for (final URL activeNNAddress : activeNNAddresses) {
-      Future<TransferFsImage.TransferResult> upload =
-          executor.submit(new Callable<TransferFsImage.TransferResult>() {
-            @Override
-            public TransferFsImage.TransferResult call() throws IOException {
-              return TransferFsImage.uploadImageFromStorage(activeNNAddress, 
conf, namesystem
-                  .getFSImage().getStorage(), imageType, txid, canceler);
-            }
-          });
-      uploads.add(upload);
+      // Upload image if at least 1 of 2 following conditions met:
+      // 1. has been quiet for long enough, try to contact the node.
+      // 2. this standby IS the primary checkpointer of target NN.
+      String addressString = activeNNAddress.toString();
+      assert checkpointReceivers.containsKey(addressString);
+      CheckpointReceiverEntry receiverEntry =
+          checkpointReceivers.get(addressString);
+      long secsSinceLastUpload =
+          TimeUnit.MILLISECONDS.toSeconds(
+              monotonicNow() - receiverEntry.getLastUploadTime());
+      boolean shouldUpload = receiverEntry.isPrimary() ||
+          secsSinceLastUpload >= checkpointConf.getQuietPeriod();
+      if (shouldUpload) {
+        Future<TransferFsImage.TransferResult> upload =
+            executor.submit(new Callable<TransferFsImage.TransferResult>() {
+              @Override
+              public TransferFsImage.TransferResult call() throws IOException {
+                return TransferFsImage.uploadImageFromStorage(activeNNAddress, 
conf, namesystem
+                    .getFSImage().getStorage(), imageType, txid, canceler);
+              }
+            });
+        uploads.put(addressString, upload);
+      }
     }
     InterruptedException ie = null;
-    IOException ioe= null;
-    int i = 0;
-    boolean success = false;
-    for (; i < uploads.size(); i++) {
-      Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+    List<IOException> ioes = Lists.newArrayList();
+    for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
+        uploads.entrySet()) {
+      String url = entry.getKey();
+      Future<TransferFsImage.TransferResult> upload = entry.getValue();
       try {
-        // TODO should there be some smarts here about retries nodes that are 
not the active NN?
+        // TODO should there be some smarts here about retries nodes that
+        //  are not the active NN?
+        CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);
         if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
-          success = true;
-          //avoid getting the rest of the results - we don't care since we had 
a successful upload
-          break;
+          receiverEntry.setLastUploadTime(monotonicNow());
+          receiverEntry.setIsPrimary(true);
+        } else {
+          receiverEntry.setIsPrimary(false);
         }
-
       } catch (ExecutionException e) {
-        ioe = new IOException("Exception during image upload: " + 
e.getMessage(),
-            e.getCause());
-        break;
+        // Even if exception happens, still proceeds to next NN url.
+        // so that fail to upload to previous NN does not cause the
+        // remaining NN not getting the fsImage.
+        ioes.add(new IOException("Exception during image upload", e));
       } catch (InterruptedException e) {
         ie = e;
         break;
       }
     }
-    lastUploadTime = monotonicNow();
-
-    // we are primary if we successfully updated the ANN
-    this.isPrimaryCheckPointer = success;
-
     // cleaner than copying code for multiple catch statements and better than 
catching all
     // exceptions, so we just handle the ones we expect.
-    if (ie != null || ioe != null) {
+    if (ie != null) {
 
       // cancel the rest of the tasks, and close the pool
-      for (; i < uploads.size(); i++) {
-        Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+      for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
+          uploads.entrySet()) {
+        Future<TransferFsImage.TransferResult> upload = entry.getValue();
         // The background thread may be blocked waiting in the throttler, so
         // interrupt it.
         upload.cancel(true);
@@ -282,11 +325,11 @@ public class StandbyCheckpointer {
       executor.awaitTermination(500, TimeUnit.MILLISECONDS);
 
       // re-throw the exception we got, since one of these two must be non-null
-      if (ie != null) {
-        throw ie;
-      } else if (ioe != null) {
-        throw ioe;
-      }
+      throw ie;
+    }
+
+    if (!ioes.isEmpty()) {
+      throw MultipleIOException.createIOException(ioes);
     }
   }
   
@@ -369,7 +412,6 @@ public class StandbyCheckpointer {
       // Reset checkpoint time so that we don't always checkpoint
       // on startup.
       lastCheckpointTime = monotonicNow();
-      lastUploadTime = monotonicNow();
       while (shouldRun) {
         boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
         if (!needRollbackCheckpoint) {
@@ -422,10 +464,7 @@ public class StandbyCheckpointer {
 
             // on all nodes, we build the checkpoint. However, we only ship 
the checkpoint if have a
             // rollback request, are the checkpointer, are outside the quiet 
period.
-            final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
-            boolean sendRequest = isPrimaryCheckPointer
-                || secsSinceLastUpload >= checkpointConf.getQuietPeriod();
-            doCheckpoint(sendRequest);
+            doCheckpoint();
 
             // reset needRollbackCheckpoint to false only when we finish a ckpt
             // for rollback image
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index ca28874..b0586d7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -109,6 +109,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -988,6 +989,11 @@ public class MiniDFSCluster implements AutoCloseable {
           format, operation, clusterId, nnCounter);
       nnCounter += nameservice.getNNs().size();
     }
+
+    for (NameNodeInfo nn : namenodes.values()) {
+      nn.nameNode.getHttpServer()
+          .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
+    }
   }
 
   /**
@@ -2127,6 +2133,8 @@ public class MiniDFSCluster implements AutoCloseable {
     }
 
     NameNode nn = NameNode.createNameNode(args, info.conf);
+    nn.getHttpServer()
+        .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
     info.nameNode = nn;
     info.setStartOpt(startOpt);
     if (waitActive) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
index 9a4f6db..2fc1cd2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import static 
org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
 import static 
org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
+import static 
org.apache.hadoop.hdfs.server.namenode.ImageServlet.RECENT_IMAGE_CHECK_ENABLED;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
 import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -2475,6 +2476,60 @@ public class TestCheckpoint {
     }
   }
 
+  @Test(timeout = 300000)
+  public void testActiveRejectSmallerDeltaImage() throws Exception {
+    MiniDFSCluster cluster = null;
+    Configuration conf = new HdfsConfiguration();
+    // Set the delta txid threshold to 10
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10);
+    // Set the delta time threshold to some arbitrarily large value, so
+    // it does not trigger a checkpoint during this test.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 900000);
+
+    SecondaryNameNode secondary = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+          .format(true).build();
+      // enable small delta rejection
+      NameNode active = cluster.getNameNode();
+      active.httpServer.getHttpServer()
+          .setAttribute(RECENT_IMAGE_CHECK_ENABLED, true);
+
+      secondary = startSecondaryNameNode(conf);
+
+      FileSystem fs = cluster.getFileSystem();
+      assertEquals(0, active.getNamesystem().getFSImage()
+          .getMostRecentCheckpointTxId());
+
+      // create 5 dir.
+      for (int i = 0; i < 5; i++) {
+        fs.mkdirs(new Path("dir-" + i));
+      }
+
+      // Checkpoint 1st
+      secondary.doCheckpoint();
+      // at this point, the txid delta is smaller than threshold 10.
+      // active does not accept this image.
+      assertEquals(0, active.getNamesystem().getFSImage()
+          .getMostRecentCheckpointTxId());
+
+      // create another 10 dir.
+      for (int i = 0; i < 10; i++) {
+        fs.mkdirs(new Path("dir2-" + i));
+      }
+
+      // Checkpoint 2nd
+      secondary.doCheckpoint();
+      // here the delta is large enough and active accepts this image.
+      assertEquals(21, active.getNamesystem().getFSImage()
+          .getMostRecentCheckpointTxId());
+    } finally {
+      cleanup(secondary);
+      cleanup(cluster);
+    }
+  }
+
   private static void cleanup(SecondaryNameNode snn) {
     if (snn != null) {
       try {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
index 75b2412..da130c7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
@@ -21,6 +21,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import java.util.Collections;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -252,7 +253,41 @@ public class TestStandbyCheckpoints {
     dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
     FSImageTestUtil.assertParallelFilesAreIdentical(dirs, 
ImmutableSet.<String>of());
   }
-  
+
+  /**
+   * Test for the case of when there are observer NameNodes, Standby node is
+   * able to upload fsImage to Observer node as well.
+   */
+  @Test(timeout = 300000)
+  public void testStandbyAndObserverState() throws Exception {
+    // Transition 2 to observer
+    cluster.transitionToObserver(1);
+    doEdits(0, 10);
+    // After a rollEditLog, Standby(nn1) 's next checkpoint would be
+    // ahead of observer(nn2).
+    nns[0].getRpcServer().rollEditLog();
+
+    // After standby creating a checkpoint, it will try to push the image to
+    // active and all observer, updating it's own txid to the most recent.
+    HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
+    HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
+    HATestUtil.waitForCheckpoint(cluster, 2, ImmutableList.of(12));
+
+    assertEquals(12, nns[2].getNamesystem().getFSImage()
+        .getMostRecentCheckpointTxId());
+    assertEquals(12, nns[1].getNamesystem().getFSImage()
+        .getMostRecentCheckpointTxId());
+
+    List<File> dirs = Lists.newArrayList();
+    // observer and standby both have this same image.
+    dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 2));
+    dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
+    FSImageTestUtil.assertParallelFilesAreIdentical(
+        dirs, Collections.<String>emptySet());
+    // Restore 2 back to standby
+    cluster.transitionToStandby(2);
+  }
+
   /**
    * Test for the case when the SBN is configured to checkpoint based
    * on a time period, but no transactions are happening on the


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to