Author: todd
Date: Wed Jul 25 21:44:26 2012
New Revision: 1365792
URL: http://svn.apache.org/viewvc?rev=1365792&view=rev
Log:
HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs.
Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
Wed Jul 25 21:44:26 2012
@@ -4,3 +4,5 @@ This will be merged into the main CHANGE
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
Contributed by Todd Lipcon based on initial work from Brandon Li and Hari
Mankude.
HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
+
+HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
Wed Jul 25 21:44:26 2012
@@ -69,6 +69,12 @@ interface AsyncLogger {
long startTxId, long endTxId);
/**
+ * Allow the remote node to purge edit logs earlier than this.
+ * @param minTxIdToKeep the min txid which must be retained
+ */
+ public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep);
+
+ /**
* @return the state of the last epoch on the target node.
*/
public ListenableFuture<GetJournalStateResponseProto> getJournalState();
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
Wed Jul 25 21:44:26 2012
@@ -113,6 +113,12 @@ class AsyncLoggerSet {
logger.close();
}
}
+
+ void purgeLogsOlderThan(long minTxIdToKeep) {
+ for (AsyncLogger logger : loggers) {
+ logger.purgeLogsOlderThan(minTxIdToKeep);
+ }
+ }
/**
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
Wed Jul 25 21:44:26 2012
@@ -292,6 +292,17 @@ public class IPCLoggerChannel implements
}
@Override
+ public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
+ return null;
+ }
+ });
+ }
+
+ @Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId) {
return executor.submit(new Callable<RemoteEditLogManifest>() {
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
Wed Jul 25 21:44:26 2012
@@ -298,7 +298,10 @@ public class QuorumJournalManager implem
@Override
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
- // TODO Auto-generated method stub
+ // This purges asynchronously -- there's no need to wait for a quorum
+ // here, because it's always OK to fail.
+ LOG.info("Purging remote journals older than txid " + minTxIdToKeep);
+ loggers.purgeLogsOlderThan(minTxIdToKeep);
}
@Override
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
Wed Jul 25 21:44:26 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.qjournal.p
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.KerberosInfo;
@@ -83,7 +84,7 @@ public interface QJournalProtocol {
* Finalize the given log segment on the JournalNode. The segment
* is expected to be in-progress and starting at the given startTxId.
*
- * @param startTxId the starting transaction ID of teh log
+ * @param startTxId the starting transaction ID of the log
* @param endTxId the expected last transaction in the given log
* @throws IOException if no such segment exists
*/
@@ -91,6 +92,13 @@ public interface QJournalProtocol {
long startTxId, long endTxId) throws IOException;
/**
+ * @throws IOException
+ * @see JournalManager#purgeLogsOlderThan(long)
+ */
+ public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep)
+ throws IOException;
+
+ /**
* @param jid the journal from which to enumerate edits
* @param sinceTxId the first transaction which the client cares about
* @return a list of edit log segments since the given transaction ID.
@@ -110,5 +118,4 @@ public interface QJournalProtocol {
*/
public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
-
}
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
Wed Jul 25 21:44:26 2012
@@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.qjournal.p
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
+import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsResponseProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
@@ -127,6 +129,18 @@ public class QJournalProtocolServerSideT
}
return FinalizeLogSegmentResponseProto.newBuilder().build();
}
+
+ @Override
+ public PurgeLogsResponseProto purgeLogs(RpcController controller,
+ PurgeLogsRequestProto req) throws ServiceException {
+ try {
+ impl.purgeLogsOlderThan(convert(req.getReqInfo()),
+ req.getMinTxIdToKeep());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return PurgeLogsResponseProto.getDefaultInstance();
+ }
@Override
public GetEditLogManifestResponseProto getEditLogManifest(
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
Wed Jul 25 21:44:26 2012
@@ -23,7 +23,6 @@ import java.net.URL;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
@@ -39,18 +38,17 @@ import org.apache.hadoop.hdfs.qjournal.p
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
-import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -165,6 +163,20 @@ public class QJournalProtocolTranslatorP
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
+ throws IOException {
+ PurgeLogsRequestProto req = PurgeLogsRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .setMinTxIdToKeep(minTxIdToKeep)
+ .build();
+ try {
+ rpcProxy.purgeLogs(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
@Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
@@ -214,4 +226,5 @@ public class QJournalProtocolTranslatorP
QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
}
+
}
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
Wed Jul 25 21:44:26 2012
@@ -92,7 +92,7 @@ class JNStorage extends Storage {
return new File(getPaxosDir(), String.valueOf(segmentTxId));
}
- private File getPaxosDir() {
+ File getPaxosDir() {
return new File(sd.getCurrentDir(), "paxos");
}
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
Wed Jul 25 21:44:26 2012
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -277,6 +279,41 @@ class Journal implements Closeable {
}
/**
+ * @see JournalManager#purgeLogsOlderThan(long)
+ */
+ public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
+ long minTxIdToKeep) throws IOException {
+ checkRequest(reqInfo);
+
+ fjm.purgeLogsOlderThan(minTxIdToKeep);
+ purgePaxosDecisionsOlderThan(minTxIdToKeep);
+ }
+
+ private void purgePaxosDecisionsOlderThan(long minTxIdToKeep)
+ throws IOException {
+ File dir = storage.getPaxosDir();
+ for (File f : FileUtil.listFiles(dir)) {
+ if (!f.isFile()) continue;
+
+ long txid;
+ try {
+ txid = Long.valueOf(f.getName());
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Unexpected non-numeric file name for " +
f.getAbsolutePath());
+ continue;
+ }
+
+ if (txid < minTxIdToKeep) {
+ if (!f.delete()) {
+ LOG.warn("Unable to delete no-longer-needed paxos decision record " +
+ f);
+ }
+ }
+ }
+ }
+
+
+ /**
* @see QJournalProtocol#getEditLogManifest(String, long)
*/
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
Wed Jul 25 21:44:26 2012
@@ -131,6 +131,13 @@ class JournalNodeRpcServer implements QJ
}
@Override
+ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
+ throws IOException {
+ jn.getOrCreateJournal(reqInfo.getJournalId())
+ .purgeLogsOlderThan(reqInfo, minTxIdToKeep);
+ }
+
+ @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId) throws IOException {
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
Wed Jul 25 21:44:26 2012
@@ -87,6 +87,17 @@ message FinalizeLogSegmentResponseProto
}
/**
+ * purgeLogs()
+ */
+message PurgeLogsRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 minTxIdToKeep = 2;
+}
+
+message PurgeLogsResponseProto {
+}
+
+/**
* getJournalState()
*/
message GetJournalStateRequestProto {
@@ -175,6 +186,9 @@ service QJournalProtocolService {
rpc finalizeLogSegment(FinalizeLogSegmentRequestProto)
returns (FinalizeLogSegmentResponseProto);
+ rpc purgeLogs(PurgeLogsRequestProto)
+ returns (PurgeLogsResponseProto);
+
rpc getEditLogManifest(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto);
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
(original)
+++
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
Wed Jul 25 21:44:26 2012
@@ -346,6 +346,45 @@ public class TestQuorumJournalManager {
checkRecovery(cluster, 1, 4);
}
+ @Test
+ public void testPurgeLogs() throws Exception {
+ for (int txid = 1; txid <= 5; txid++) {
+ writeSegment(qjm, txid, 1, true);
+ }
+ File curDir = cluster.getCurrentDir(0, JID);
+ GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
+ NNStorage.getFinalizedEditsFileName(1, 1),
+ NNStorage.getFinalizedEditsFileName(2, 2),
+ NNStorage.getFinalizedEditsFileName(3, 3),
+ NNStorage.getFinalizedEditsFileName(4, 4),
+ NNStorage.getFinalizedEditsFileName(5, 5));
+ File paxosDir = new File(curDir, "paxos");
+ GenericTestUtils.assertExists(paxosDir);
+
+ // Create new files in the paxos directory, which should get purged too.
+ assertTrue(new File(paxosDir, "1").createNewFile());
+ assertTrue(new File(paxosDir, "3").createNewFile());
+
+ GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
+ "1", "3");
+
+ qjm.purgeLogsOlderThan(3);
+
+ // Log purging is asynchronous, so we have to wait for the calls
+ // to be sent and respond before verifying.
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+
+ // Older edits should be purged
+ GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
+ NNStorage.getFinalizedEditsFileName(3, 3),
+ NNStorage.getFinalizedEditsFileName(4, 4),
+ NNStorage.getFinalizedEditsFileName(5, 5));
+
+ // Older paxos files should be purged
+ GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
+ "3");
+ }
+
private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException {
@@ -385,6 +424,14 @@ public class TestQuorumJournalManager {
stm.setReadyToFlush();
stm.flush();
}
+
+ private static void waitForAllPendingCalls(AsyncLoggerSet als)
+ throws InterruptedException {
+ for (AsyncLogger l : als.getLoggersForTests()) {
+ IPCLoggerChannel ch = (IPCLoggerChannel)l;
+ ch.waitForAllPendingCalls();
+ }
+ }
private void assertExistsInQuorum(MiniJournalCluster cluster,
String fname) {