Author: atm
Date: Fri May 16 21:23:03 2014
New Revision: 1595351
URL: http://svn.apache.org/r1595351
Log:
HDFS-6406. Add capability for NFS gateway to reject connections from
unprivileged ports. Contributed by Aaron T. Myers.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/resources/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1595351&r1=1595350&r2=1595351&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
(original)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
Fri May 16 21:23:03 2014
@@ -19,11 +19,14 @@ package org.apache.hadoop.oncrpc;
import java.io.IOException;
import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -37,7 +40,7 @@ import org.jboss.netty.channel.SimpleCha
* and implement {@link #handleInternal} to handle the requests received.
*/
public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
- private static final Log LOG = LogFactory.getLog(RpcProgram.class);
+ static final Log LOG = LogFactory.getLog(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
private final String host;
@@ -45,6 +48,7 @@ public abstract class RpcProgram extends
private final int progNumber;
private final int lowProgVersion;
private final int highProgVersion;
+ private final boolean allowInsecurePorts;
/**
* If not null, this will be used as the socket to use to connect to the
@@ -61,10 +65,14 @@ public abstract class RpcProgram extends
* @param progNumber program number as defined in RFC 1050
* @param lowProgVersion lowest version of the specification supported
* @param highProgVersion highest version of the specification supported
+ * @param DatagramSocket registrationSocket if not null, use this socket to
+ * register with portmap daemon
+ * @param allowInsecurePorts true to allow client connections from
+ * unprivileged ports, false otherwise
*/
protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion,
- DatagramSocket registrationSocket) {
+ DatagramSocket registrationSocket, boolean allowInsecurePorts) {
this.program = program;
this.host = host;
this.port = port;
@@ -72,6 +80,9 @@ public abstract class RpcProgram extends
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
this.registrationSocket = registrationSocket;
+ this.allowInsecurePorts = allowInsecurePorts;
+ LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client "
+ + "connections from unprivileged ports");
}
/**
@@ -133,43 +144,82 @@ public abstract class RpcProgram extends
throws Exception {
RpcInfo info = (RpcInfo) e.getMessage();
RpcCall call = (RpcCall) info.header();
+
+ SocketAddress remoteAddress = info.remoteAddress();
+ if (!allowInsecurePorts) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will not allow connections from unprivileged ports. " +
+ "Checking for valid client port...");
+ }
+ if (remoteAddress instanceof InetSocketAddress) {
+ InetSocketAddress inetRemoteAddress = (InetSocketAddress)
remoteAddress;
+ if (inetRemoteAddress.getPort() > 1023) {
+ LOG.warn("Connection attempted from '" + inetRemoteAddress + "' "
+ + "which is an unprivileged port. Rejecting connection.");
+ sendRejectedReply(call, remoteAddress, ctx);
+ return;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Accepting connection from '" + remoteAddress + "'");
+ }
+ }
+ } else {
+ LOG.warn("Could not determine remote port of socket address '" +
+ remoteAddress + "'. Rejecting connection.");
+ sendRejectedReply(call, remoteAddress, ctx);
+ return;
+ }
+ }
+
if (LOG.isTraceEnabled()) {
LOG.trace(program + " procedure #" + call.getProcedure());
}
if (this.progNumber != call.getProgram()) {
LOG.warn("Invalid RPC call program " + call.getProgram());
- RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
- AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
-
- XDR out = new XDR();
- reply.write(out);
- ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
- .buffer());
- RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
- RpcUtil.sendRpcResponse(ctx, rsp);
+ sendAcceptedReply(call, remoteAddress, AcceptState.PROG_UNAVAIL, ctx);
return;
}
int ver = call.getVersion();
if (ver < lowProgVersion || ver > highProgVersion) {
LOG.warn("Invalid RPC call version " + ver);
- RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
- AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
-
- XDR out = new XDR();
- reply.write(out);
- out.writeInt(lowProgVersion);
- out.writeInt(highProgVersion);
- ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
- .buffer());
- RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
- RpcUtil.sendRpcResponse(ctx, rsp);
+ sendAcceptedReply(call, remoteAddress, AcceptState.PROG_MISMATCH, ctx);
return;
}
handleInternal(ctx, info);
}
+
+ private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
+ AcceptState acceptState, ChannelHandlerContext ctx) {
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+ acceptState, Verifier.VERIFIER_NONE);
+
+ XDR out = new XDR();
+ reply.write(out);
+ if (acceptState == AcceptState.PROG_MISMATCH) {
+ out.writeInt(lowProgVersion);
+ out.writeInt(highProgVersion);
+ }
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(b, remoteAddress);
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ }
+
+ private static void sendRejectedReply(RpcCall call,
+ SocketAddress remoteAddress, ChannelHandlerContext ctx) {
+ XDR out = new XDR();
+ RpcDeniedReply reply = new RpcDeniedReply(call.getXid(),
+ RpcReply.ReplyState.MSG_DENIED,
+ RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
+ reply.write(out);
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(buf, remoteAddress);
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ }
protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo
info);
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1595351&r1=1595350&r2=1595351&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
(original)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
Fri May 16 21:23:03 2014
@@ -28,6 +28,8 @@ import java.util.Random;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.log4j.Level;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -38,10 +40,16 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TestFrameDecoder {
+
+ static {
+ ((Log4JLogger) RpcProgram.LOG).getLogger().setLevel(Level.ALL);
+ }
private static int resultSize;
static void testRequest(XDR request, int serverPort) {
+ // Reset resultSize so as to avoid interference from other tests in this
class.
+ resultSize = 0;
SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", serverPort,
request,
true);
tcpClient.run();
@@ -50,9 +58,10 @@ public class TestFrameDecoder {
static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port,
- int progNumber, int lowProgVersion, int highProgVersion) {
+ int progNumber, int lowProgVersion, int highProgVersion,
+ boolean allowInsecurePorts) {
super(program, host, port, progNumber, lowProgVersion, highProgVersion,
- null);
+ null, allowInsecurePorts);
}
@Override
@@ -149,7 +158,41 @@ public class TestFrameDecoder {
@Test
public void testFrames() {
+ int serverPort = startRpcServer(true);
+ XDR xdrOut = createGetportMount();
+ int headerSize = xdrOut.size();
+ int bufsize = 2 * 1024 * 1024;
+ byte[] buffer = new byte[bufsize];
+ xdrOut.writeFixedOpaque(buffer);
+ int requestSize = xdrOut.size() - headerSize;
+
+ // Send the request to the server
+ testRequest(xdrOut, serverPort);
+
+ // Verify the server got the request with right size
+ assertEquals(requestSize, resultSize);
+ }
+
+ @Test
+ public void testUnprivilegedPort() {
+ // Don't allow connections from unprivileged ports. Given that this test is
+ // presumably not being run by root, this will be the case.
+ int serverPort = startRpcServer(false);
+
+ XDR xdrOut = createGetportMount();
+ int bufsize = 2 * 1024 * 1024;
+ byte[] buffer = new byte[bufsize];
+ xdrOut.writeFixedOpaque(buffer);
+
+ // Send the request to the server
+ testRequest(xdrOut, serverPort);
+
+ // Verify the server rejected the request.
+ assertEquals(0, resultSize);
+ }
+
+ private static int startRpcServer(boolean allowInsecurePorts) {
Random rand = new Random();
int serverPort = 30000 + rand.nextInt(10000);
int retries = 10; // A few retries in case initial choice is in use.
@@ -157,7 +200,7 @@ public class TestFrameDecoder {
while (true) {
try {
RpcProgram program = new
TestFrameDecoder.TestRpcProgram("TestRpcProgram",
- "localhost", serverPort, 100000, 1, 2);
+ "localhost", serverPort, 100000, 1, 2, allowInsecurePorts);
SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program,
1);
tcpServer.run();
break; // Successfully bound a port, break out.
@@ -169,19 +212,7 @@ public class TestFrameDecoder {
}
}
}
-
- XDR xdrOut = createGetportMount();
- int headerSize = xdrOut.size();
- int bufsize = 2 * 1024 * 1024;
- byte[] buffer = new byte[bufsize];
- xdrOut.writeFixedOpaque(buffer);
- int requestSize = xdrOut.size() - headerSize;
-
- // Send the request to the server
- testRequest(xdrOut, serverPort);
-
- // Verify the server got the request with right size
- assertEquals(requestSize, resultSize);
+ return serverPort;
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties?rev=1595351&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
(added)
+++
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
Fri May 16 21:23:03 2014
@@ -0,0 +1,18 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2}
(%F:%M(%L)) - %m%n