This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f847983f2c621d3de2c2ffbcfc1f1e40d83efa4e
Author: Erik Krogen <xkro...@apache.org>
AuthorDate: Tue May 22 16:45:26 2018 -0700

    HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to 
support tailing edits via RPC. Contributed by Erik Krogen.
---
 .../hadoop/hdfs/qjournal/client/AsyncLogger.java   |   7 ++
 .../hdfs/qjournal/client/AsyncLoggerSet.java       |  14 +++
 .../hdfs/qjournal/client/IPCLoggerChannel.java     |  14 +++
 .../hdfs/qjournal/client/QuorumJournalManager.java | 111 +++++++++++++++++-
 .../server/namenode/EditLogFileInputStream.java    |  44 +++++++
 .../hdfs/server/namenode/ha/EditLogTailer.java     |   6 +-
 .../src/main/resources/hdfs-default.xml            |   4 +-
 .../qjournal/client/TestQuorumJournalManager.java  | 130 +++++++++++++++++++++
 .../client/TestQuorumJournalManagerUnit.java       | 101 +++++++++++++++-
 .../namenode/TestEditLogFileInputStream.java       |  18 +++
 10 files changed, 439 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
index d2b48cc..7230ebc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
@@ -22,6 +22,7 @@ import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -106,6 +107,12 @@ interface AsyncLogger {
    * Begin a new epoch on the target node.
    */
   public ListenableFuture<NewEpochResponseProto> newEpoch(long epoch);
+
+  /**
+   * Fetch journaled edits from the cache.
+   */
+  public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
+      long fromTxnId, int maxTransactions);
   
   /**
    * Fetch the list of edit logs available on the remote node.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
index d46c2cf..15e1df6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -261,6 +262,19 @@ class AsyncLoggerSet {
     return QuorumCall.create(calls);
   }
 
+  public QuorumCall<AsyncLogger, GetJournaledEditsResponseProto>
+  getJournaledEdits(long fromTxnId, int maxTransactions) {
+    Map<AsyncLogger,
+        ListenableFuture<GetJournaledEditsResponseProto>> calls
+        = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<GetJournaledEditsResponseProto> future =
+          logger.getJournaledEdits(fromTxnId, maxTransactions);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
   public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
       long fromTxnId, boolean inProgressOk) {
     Map<AsyncLogger,
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
index 53141ea..8d3dc42 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -559,6 +560,19 @@ public class IPCLoggerChannel implements AsyncLogger {
   }
 
   @Override
+  public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
+      long fromTxnId, int maxTransactions) {
+    return parallelExecutor.submit(
+        new Callable<GetJournaledEditsResponseProto>() {
+          @Override
+          public GetJournaledEditsResponseProto call() throws IOException {
+            return getProxy().getJournaledEdits(journalId, nameServiceId,
+                fromTxnId, maxTransactions);
+          }
+        });
+  }
+
+  @Override
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       final long fromTxnId, final boolean inProgressOk) {
     return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index f06eec1..f9d96b0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -67,6 +69,14 @@ import com.google.protobuf.TextFormat;
 public class QuorumJournalManager implements JournalManager {
   static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);
 
+  // This config is not publicly exposed
+  static final String QJM_RPC_MAX_TXNS_KEY =
+      "dfs.ha.tail-edits.qjm.rpc.max-txns";
+  static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000;
+
+  // Maximum number of transactions to fetch at a time when using the
+  // RPC edit fetch mechanism
+  private final int maxTxnsPerRpc;
   // Timeouts for which the QJM will wait for each of the following actions.
   private final int startSegmentTimeoutMs;
   private final int prepareRecoveryTimeoutMs;
@@ -125,6 +135,10 @@ public class QuorumJournalManager implements 
JournalManager {
     this.nameServiceId = nameServiceId;
     this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
 
+    this.maxTxnsPerRpc =
+        conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
+    Preconditions.checkArgument(maxTxnsPerRpc > 0,
+        "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
     // Configure timeouts.
     this.startSegmentTimeoutMs = conf.getInt(
         DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
@@ -479,17 +493,104 @@ public class QuorumJournalManager implements 
JournalManager {
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxnId, boolean inProgressOk,
       boolean onlyDurableTxns) throws IOException {
+    if (inProgressOk) {
+      LOG.info("Tailing edits starting from txn ID " + fromTxnId +
+          " via RPC mechanism");
+      try {
+        Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
+        selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
+        streams.addAll(rpcStreams);
+        return;
+      } catch (IOException ioe) {
+        LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
+            " via RPC; falling back to streaming.", ioe);
+      }
+    }
+    selectStreamingInputStreams(streams, fromTxnId, inProgressOk,
+        onlyDurableTxns);
+  }
+
+  /**
+   * Select input streams from the journals, specifically using the RPC
+   * mechanism optimized for low latency.
+   *
+   * @param streams The collection to store the return streams into.
+   * @param fromTxnId Select edits starting from this transaction ID
+   * @param onlyDurableTxns Iff true, only include transactions which have been
+   *                        committed to a quorum of the journals.
+   * @throws IOException Upon issues, including cache misses on the journals.
+   */
+  private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean onlyDurableTxns) throws IOException {
+    QuorumCall<AsyncLogger, GetJournaledEditsResponseProto> q =
+        loggers.getJournaledEdits(fromTxnId, maxTxnsPerRpc);
+    Map<AsyncLogger, GetJournaledEditsResponseProto> responseMap =
+        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
+            "selectRpcInputStreams");
+    assert responseMap.size() >= loggers.getMajoritySize() :
+        "Quorum call returned without a majority";
 
+    List<Integer> responseCounts = new ArrayList<>();
+    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
+      responseCounts.add(resp.getTxnCount());
+    }
+    Collections.sort(responseCounts);
+    int highestTxnCount = responseCounts.get(responseCounts.size() - 1);
+    if (LOG.isDebugEnabled() || highestTxnCount < 0) {
+      StringBuilder msg = new StringBuilder("Requested edits starting from ");
+      msg.append(fromTxnId).append("; got ").append(responseMap.size())
+          .append(" responses: <");
+      for (Map.Entry<AsyncLogger, GetJournaledEditsResponseProto> ent :
+          responseMap.entrySet()) {
+        msg.append("[").append(ent.getKey()).append(", ")
+            .append(ent.getValue().getTxnCount()).append("],");
+      }
+      msg.append(">");
+      if (highestTxnCount < 0) {
+        throw new IOException("Did not get any valid JournaledEdits " +
+            "responses: " + msg);
+      } else {
+        LOG.debug(msg.toString());
+      }
+    }
+
+    int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
+        responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
+    if (maxAllowedTxns == 0) {
+      LOG.debug("No new edits available in logs; requested starting from " +
+          "ID " + fromTxnId);
+      return;
+    }
+    LOG.info("Selected loggers with >= " + maxAllowedTxns +
+        " transactions starting from " + fromTxnId);
+    PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
+        JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
+      long endTxnId = fromTxnId - 1 +
+          Math.min(maxAllowedTxns, resp.getTxnCount());
+      allStreams.add(EditLogFileInputStream.fromByteString(
+          resp.getEditLog(), fromTxnId, endTxnId, true));
+    }
+    JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
+  }
+
+  /**
+   * Select input streams from the journals, specifically using the streaming
+   * mechanism optimized for resiliency / bulk load.
+   */
+  private void selectStreamingInputStreams(
+      Collection<EditLogInputStream> streams, long fromTxnId,
+      boolean inProgressOk, boolean onlyDurableTxns) throws IOException {
     QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
         loggers.getEditLogManifest(fromTxnId, inProgressOk);
     Map<AsyncLogger, RemoteEditLogManifest> resps =
         loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
-            "selectInputStreams");
-    
-    LOG.debug("selectInputStream manifests:\n" +
+            "selectStreamingInputStreams");
+
+    LOG.debug("selectStreamingInputStream manifests:\n" +
         Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
-    
-    final PriorityQueue<EditLogInputStream> allStreams = 
+
+    final PriorityQueue<EditLogInputStream> allStreams =
         new PriorityQueue<EditLogInputStream>(64,
             JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 36c2232..4e9b174 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import com.google.protobuf.ByteString;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
@@ -119,6 +120,23 @@ public class EditLogFileInputStream extends 
EditLogInputStream {
     return new EditLogFileInputStream(new URLLog(connectionFactory, url),
         startTxId, endTxId, inProgress);
   }
+
+  /**
+   * Create an EditLogInputStream from a {@link ByteString}, i.e. an in-memory
+   * collection of bytes.
+   *
+   * @param bytes The byte string to read from
+   * @param startTxId the expected starting transaction ID
+   * @param endTxId the expected ending transaction ID
+   * @param inProgress whether the log is in-progress
+   * @return An edit stream to read from
+   */
+  public static EditLogInputStream fromByteString(ByteString bytes,
+      long startTxId, long endTxId, boolean inProgress) {
+    return new EditLogFileInputStream(new ByteStringLog(bytes,
+        String.format("ByteStringEditLog[%d, %d]", startTxId, endTxId)),
+        startTxId, endTxId, inProgress);
+  }
   
   private EditLogFileInputStream(LogSource log,
       long firstTxId, long lastTxId,
@@ -376,6 +394,32 @@ public class EditLogFileInputStream extends 
EditLogInputStream {
     public long length();
     public String getName();
   }
+
+  private static class ByteStringLog implements LogSource {
+    private final ByteString bytes;
+    private final String name;
+
+    public ByteStringLog(ByteString bytes, String name) {
+      this.bytes = bytes;
+      this.name = name;
+    }
+
+    @Override
+    public InputStream getInputStream() {
+      return bytes.newInput();
+    }
+
+    @Override
+    public long length() {
+      return bytes.size();
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+  }
   
   private static class FileLog implements LogSource {
     private final File file;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
index d88a95f..cd797fd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
@@ -151,7 +151,11 @@ public class EditLogTailer {
   private int maxRetries;
 
   /**
-   * Whether the tailer should tail the in-progress edit log segments.
+   * Whether the tailer should tail the in-progress edit log segments. If true,
+   * this will also attempt to optimize for latency when tailing the edit logs
+   * (if using the
+   * {@link org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager}, this
+   * implies using the RPC-based mechanism to tail edits).
    */
   private final boolean inProgressOk;
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index edd240a..ba77e1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3117,7 +3117,9 @@
   <description>
     Whether enable standby namenode to tail in-progress edit logs.
     Clients might want to turn it on when they want Standby NN to have
-    more up-to-date data.
+    more up-to-date data. When using the QuorumJournalManager, this enables
+    tailing of edit logs via the RPC-based mechanism, rather than streaming,
+    which allows for much fresher data.
   </description>
 </property>
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
index 69856ae..cac8ba5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
@@ -44,6 +44,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -91,6 +92,10 @@ public class TestQuorumJournalManager {
     conf = new Configuration();
     // Don't retry connections - it just slows down the tests.
     
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 
0);
+    // Turn off IPC client caching to handle daemon restarts.
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 
0);
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     
     cluster = new MiniJournalCluster.Builder(conf)
         .baseDir(GenericTestUtils.getRandomizedTestDir().getAbsolutePath())
@@ -959,6 +964,131 @@ public class TestQuorumJournalManager {
     qjm2.selectInputStreams(streams, 1, true, true);
     verifyEdits(streams, 1, 8);
   }
+
+  @Test
+  public void testSelectViaRpcWithDurableTransactions() throws Exception {
+    // Two loggers will have up to ID 5, one will have up to ID 6
+    failLoggerAtTxn(spies.get(0), 6);
+    failLoggerAtTxn(spies.get(1), 6);
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 5);
+    try {
+      writeTxns(stm, 6, 1);
+      fail("Did not fail to write when only a minority succeeded");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3", qe);
+    }
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    verifyEdits(streams, 1, 5);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcWithoutDurableTransactions() throws Exception {
+    setupLoggers345();
+    futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, false);
+    verifyEdits(streams, 1, 5);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcOneDeadJN() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+
+    cluster.getJournalNode(0).stopAndJoin(0);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, false);
+    verifyEdits(streams, 1, 10);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+  }
+
+  @Test
+  public void testSelectViaRpcTwoDeadJNs() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+
+    cluster.getJournalNode(0).stopAndJoin(0);
+    cluster.getJournalNode(1).stopAndJoin(0);
+
+    try {
+      qjm.selectInputStreams(new ArrayList<>(), 1, true, false);
+      fail("");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3", qe);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcTwoJNsError() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 1);
+
+    futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    // This should still succeed as the QJM should fall back to the streaming
+    // mechanism for fetching edits
+    verifyEdits(streams, 1, 11);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcAfterJNRestart() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+    qjm.finalizeLogSegment(1, 10);
+
+    // Close to avoid connections hanging around after the JNs are restarted
+    for (int i = 0; i < cluster.getNumNodes(); i++) {
+      cluster.restartJournalNode(i);
+    }
+    cluster.waitActive();
+
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    // This should still succeed as the QJM should fall back to the streaming
+    // mechanism for fetching edits
+    verifyEdits(streams, 1, 10);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
+    }
+  }
   
   private QuorumJournalManager createSpyingQJM()
       throws IOException, URISyntaxException {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
index 75dcf2f..0158eb1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hdfs.qjournal.client;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
@@ -28,11 +31,11 @@ import java.util.List;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -44,11 +47,15 @@ import org.mockito.Mockito;
 import org.mockito.stubbing.Stubber;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.ByteString;
 
 import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
 
 /**
  * True unit tests for QuorumJournalManager
@@ -215,6 +222,94 @@ public class TestQuorumJournalManagerUnit {
     Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L);
   }
 
+  @Test
+  public void testReadRpcInputStreams() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(getJournaledEditsReponse(1, 3))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 3);
+  }
+
+  @Test
+  public void testReadRpcMismatchedInputStreams() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 2);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsOneSlow() throws Exception {
+    for (int jn = 0; jn < 2; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+    Mockito.doReturn(SettableFuture.create())
+        .when(spyLoggers.get(2)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 1);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsOneException() throws Exception {
+    for (int jn = 0; jn < 2; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+    futureThrows(new IOException()).when(spyLoggers.get(2))
+        .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 1);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsNoNewEdits() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(GetJournaledEditsResponseProto.newBuilder()
+          .setTxnCount(0).setEditLog(ByteString.EMPTY).build())
+          .when(spyLoggers.get(jn))
+          .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(0, streams.size());
+  }
+
+  private GetJournaledEditsResponseProto getJournaledEditsReponse(
+      int startTxn, int numTxns) throws Exception {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    EditLogFileOutputStream.writeHeader(
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+        new DataOutputStream(byteStream));
+    byteStream.write(createTxnData(startTxn, numTxns));
+    return GetJournaledEditsResponseProto.newBuilder()
+        .setTxnCount(numTxns)
+        .setEditLog(ByteString.copyFrom(byteStream.toByteArray()))
+        .build();
+  }
+
   private EditLogOutputStream createLogSegment() throws IOException {
     
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
         Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
index cd329a6..7b682b0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
@@ -34,6 +34,7 @@ import java.util.EnumMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -80,6 +81,23 @@ public class TestEditLogFileInputStream {
     elis.close();
   }
 
+  @Test
+  public void testByteStringLog() throws Exception {
+    ByteString bytes = ByteString.copyFrom(FAKE_LOG_DATA);
+    EditLogInputStream elis = EditLogFileInputStream.fromByteString(bytes,
+        HdfsServerConstants.INVALID_TXID, HdfsServerConstants.INVALID_TXID,
+        true);
+    // Read the edit log and verify that all of the data is present
+    EnumMap<FSEditLogOpCodes, Holder<Integer>> counts = FSImageTestUtil
+        .countEditLogOpTypes(elis);
+    assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1));
+    assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1));
+    assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1));
+
+    assertEquals(FAKE_LOG_DATA.length, elis.length());
+    elis.close();
+  }
+
   /**
    * Regression test for HDFS-8965 which verifies that
    * FSEditLogFileInputStream#scanOp verifies Op checksums.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to