Author: todd
Date: Wed Jul 25 21:40:17 2012
New Revision: 1365788
URL: http://svn.apache.org/viewvc?rev=1365788&view=rev
Log:
HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary. Contributed
by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1365788&r1=1365787&r2=1365788&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
Wed Jul 25 21:40:17 2012
@@ -2,3 +2,5 @@ Changes for HDFS-3077 branch.
This will be merged into the main CHANGES.txt when the branch is merged.
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
Contributed by Todd Lipcon based on initial work from Brandon Li and Hari
Mankude.
+
+HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1365788&r1=1365787&r2=1365788&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
Wed Jul 25 21:40:17 2012
@@ -20,12 +20,12 @@ package org.apache.hadoop.hdfs.qjournal.
import java.net.URL;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
-import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import com.google.common.util.concurrent.ListenableFuture;
@@ -81,7 +81,7 @@ interface AsyncLogger {
/**
* Fetch the list of edit logs available on the remote node.
*/
- public ListenableFuture<GetEditLogManifestResponseProto> getEditLogManifest(
+ public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
long fromTxnId);
/**
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1365788&r1=1365787&r2=1365788&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
Wed Jul 25 21:40:17 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.qjournal.p
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -232,13 +233,13 @@ class AsyncLoggerSet {
return QuorumCall.create(calls);
}
- public QuorumCall<AsyncLogger,GetEditLogManifestResponseProto>
+ public QuorumCall<AsyncLogger, RemoteEditLogManifest>
getEditLogManifest(long fromTxnId) {
Map<AsyncLogger,
- ListenableFuture<GetEditLogManifestResponseProto>> calls
+ ListenableFuture<RemoteEditLogManifest>> calls
= Maps.newHashMap();
for (AsyncLogger logger : loggers) {
- ListenableFuture<GetEditLogManifestResponseProto> future =
+ ListenableFuture<RemoteEditLogManifest> future =
logger.getEditLogManifest(fromTxnId);
calls.put(logger, future);
}
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1365788&r1=1365787&r2=1365788&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
Wed Jul 25 21:40:17 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.qjournal.p
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@@ -290,12 +292,17 @@ public class IPCLoggerChannel implements
}
@Override
- public ListenableFuture<GetEditLogManifestResponseProto> getEditLogManifest(
+ public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId) {
- return executor.submit(new Callable<GetEditLogManifestResponseProto>() {
+ return executor.submit(new Callable<RemoteEditLogManifest>() {
@Override
- public GetEditLogManifestResponseProto call() throws IOException {
- return getProxy().getEditLogManifest(journalId, fromTxnId);
+ public RemoteEditLogManifest call() throws IOException {
+ GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
+ journalId, fromTxnId);
+ // Update the http port, since we need this to build URLs to any of the
+ // returned logs.
+ httpPort = ret.getHttpPort();
+ return PBHelper.convert(ret.getManifest());
}
});
}
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1365788&r1=1365787&r2=1365788&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
Wed Jul 25 21:40:17 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
@@ -334,9 +335,9 @@ public class QuorumJournalManager implem
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) {
- QuorumCall<AsyncLogger,GetEditLogManifestResponseProto> q =
+ QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId);
- Map<AsyncLogger, GetEditLogManifestResponseProto> resps;
+ Map<AsyncLogger, RemoteEditLogManifest> resps;
try {
resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
} catch (IOException ioe) {
@@ -344,16 +345,15 @@ public class QuorumJournalManager implem
throw new RuntimeException(ioe);
}
- LOG.info("selectInputStream manifests:\n" +
- QuorumCall.mapToString(resps));
+ LOG.debug("selectInputStream manifests:\n" +
+ Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
- for (Map.Entry<AsyncLogger, GetEditLogManifestResponseProto> e :
resps.entrySet()) {
+ for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
AsyncLogger logger = e.getKey();
- GetEditLogManifestResponseProto response = e.getValue();
- RemoteEditLogManifest manifest =
PBHelper.convert(response.getManifest());
+ RemoteEditLogManifest manifest = e.getValue();
for (RemoteEditLog remoteLog : manifest.getLogs()) {
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1365788&r1=1365787&r2=1365788&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
Wed Jul 25 21:40:17 2012
@@ -281,7 +281,8 @@ class Journal implements Closeable {
*/
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
- // TODO: check fencing info?
+ // No need to checkRequest() here - anyone may ask for the list
+ // of segments.
RemoteEditLogManifest manifest = new RemoteEditLogManifest(
fjm.getRemoteEditLogs(sinceTxId));
return manifest;
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1365788&r1=1365787&r2=1365788&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
Wed Jul 25 21:40:17 2012
@@ -17,10 +17,9 @@
*/
package org.apache.hadoop.hdfs.qjournal.client;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -35,11 +34,15 @@ import org.apache.hadoop.hdfs.qjournal.c
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
@@ -98,6 +101,69 @@ public class TestQuorumJournalManager {
}
@Test
+ public void testReaderWhileAnotherWrites() throws Exception {
+
+ QuorumJournalManager readerQjm = createSpyingQJM();
+ List<EditLogInputStream> streams = Lists.newArrayList();
+ readerQjm.selectInputStreams(streams, 0, false);
+ assertEquals(0, streams.size());
+ writeSegment(qjm, 1, 3, true);
+
+ readerQjm.selectInputStreams(streams, 0, false);
+ try {
+ assertEquals(1, streams.size());
+ // Validate the actual stream contents.
+ EditLogInputStream stream = streams.get(0);
+ assertEquals(1, stream.getFirstTxId());
+ assertEquals(3, stream.getLastTxId());
+
+ for (int i = 1; i <= 3; i++) {
+ FSEditLogOp op = stream.readOp();
+ assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
+ assertEquals(i, op.getTransactionId());
+ }
+ assertNull(stream.readOp());
+ } finally {
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+ streams.clear();
+ }
+
+ // Ensure correct results when there is a stream in-progress, but we don't
+ // ask for in-progress.
+ writeSegment(qjm, 4, 3, false);
+ readerQjm.selectInputStreams(streams, 0, false);
+ try {
+ assertEquals(1, streams.size());
+ EditLogInputStream stream = streams.get(0);
+ assertEquals(1, stream.getFirstTxId());
+ assertEquals(3, stream.getLastTxId());
+ } finally {
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+ streams.clear();
+ }
+
+ // TODO: check results for selectInputStreams with inProgressOK = true.
+ // This doesn't currently work, due to a bug where RedundantEditInputStream
+ // throws an exception if there are any unvalidated in-progress edits in
the list!
+ // But, it shouldn't be necessary for current use cases.
+
+ qjm.finalizeLogSegment(4, 6);
+ readerQjm.selectInputStreams(streams, 0, false);
+ try {
+ assertEquals(2, streams.size());
+ assertEquals(4, streams.get(1).getFirstTxId());
+ assertEquals(6, streams.get(1).getLastTxId());
+ } finally {
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+ streams.clear();
+ }
+ }
+
+ /**
+ * TODO: this test needs to be fleshed out to be an exhaustive failure test
+ * @throws Exception
+ */
+ @Test
public void testOrchestratedFailures() throws Exception {
writeSegment(qjm, 1, 3, true);
writeSegment(qjm, 4, 3, true);
@@ -287,7 +353,6 @@ public class TestQuorumJournalManager {
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) {
@Override
protected List<AsyncLogger> createLoggers() throws IOException {
- LOG.info("===> make spies");
List<AsyncLogger> realLoggers = super.createLoggers();
List<AsyncLogger> spies = Lists.newArrayList();
for (AsyncLogger logger : realLoggers) {