Repository: tez Updated Branches: refs/heads/TEZ-3334 [created] 077dd88e0
http://git-wip-us.apache.org/repos/asf/tez/blob/077dd88e/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java new file mode 100644 index 0000000..ffab7dd --- /dev/null +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -0,0 +1,1127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.auxservices; + +//import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +//import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +//import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertTrue; +import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.SocketException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.zip.CheckedOutputStream; +import java.util.zip.Checksum; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.MapTask; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.tez.runtime.library.common.security.SecureShuffleUtils; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.PureJavaCrc32; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.records.Version; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.socket.SocketChannel; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.AbstractChannel; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.Mockito; +import org.mortbay.jetty.HttpHeaders; + +public class TestShuffleHandler { + static final long MiB = 1024 * 1024; + private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); + + class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler { + @Override + protected Shuffle getShuffle(final Configuration conf) { + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + @Override + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { + // Do nothing. + return null; + } + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + // Do nothing. + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { + + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + }; + } + } + + private static class MockShuffleHandler2 extends org.apache.tez.auxservices.ShuffleHandler { + boolean socketKeepAlive = false; + + @Override + protected Shuffle getShuffle(final Configuration conf) { + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + SocketChannel channel = (SocketChannel)(ctx.getChannel()); + socketKeepAlive = channel.getConfig().isKeepAlive(); + } + }; + } + + protected boolean isSocketKeepAlive() { + return socketKeepAlive; + } + } + + /** + * Test the validation of ShuffleHandler's meta-data's serialization and + * de-serialization. + * + * @throws Exception exception + */ + @Test (timeout = 10000) + public void testSerializeMeta() throws Exception { + assertEquals(1, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(1))); + assertEquals(-1, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(-1))); + assertEquals(8080, ShuffleHandler.deserializeMetaData( + ShuffleHandler.serializeMetaData(8080))); + } + + /** + * Validate shuffle connection and input/output metrics. + * + * @throws Exception exception + */ + @Test (timeout = 10000) + public void testShuffleMetrics() throws Exception { + MetricsSystem ms = new MetricsSystemImpl(); + ShuffleHandler sh = new ShuffleHandler(ms); + ChannelFuture cf = mock(ChannelFuture.class); + when(cf.isSuccess()).thenReturn(true, false); + + sh.metrics.shuffleConnections.incr(); + sh.metrics.shuffleOutputBytes.incr(1*MiB); + sh.metrics.shuffleConnections.incr(); + sh.metrics.shuffleOutputBytes.incr(2*MiB); + + checkShuffleMetrics(ms, 3*MiB, 0 , 0, 2); + + sh.metrics.operationComplete(cf); + sh.metrics.operationComplete(cf); + + checkShuffleMetrics(ms, 3*MiB, 1, 1, 0); + } + + static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed, + int succeeded, int connections) { + /* TODO + MetricsSource source = ms.getSource("ShuffleMetrics"); + MetricsRecordBuilder rb = getMetrics(source); + assertCounter("ShuffleOutputBytes", bytes, rb); + assertCounter("ShuffleOutputsFailed", failed, rb); + assertCounter("ShuffleOutputsOK", succeeded, rb); + assertGauge("ShuffleConnections", connections, rb); + */ + } + + /** + * Verify client prematurely closing a connection. + * + * @throws Exception exception. + */ + @Test (timeout = 10000) + public void testClientClosesConnection() throws Exception { + final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { + return null; + } + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + // Only set response headers and skip everything else + // send some dummy value for content-length + super.setResponseHeaders(response, keepAliveParam, 100); + } + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) + throws IOException { + // send a shuffle header and a lot of data down the channel + // to trigger a broken pipe + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100000; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + @Override + protected void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + }; + } + }; + shuffleHandler.init(conf); + shuffleHandler.start(); + + // simulate a reducer that closes early by reading a single shuffle header + // then closing the connection + URL url = new URL("http://127.0.0.1:" + + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + DataInputStream input = new DataInputStream(conn.getInputStream()); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + Assert.assertEquals("close", conn.getHeaderField(HttpHeaders.CONNECTION)); + ShuffleHeader header = new ShuffleHeader(); + header.readFields(input); + input.close(); + + shuffleHandler.stop(); + Assert.assertTrue("sendError called when client closed connection", + failures.size() == 0); + } + + @Test(timeout = 10000) + public void testKeepAlive() throws Exception { + final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); + // try setting to -ve keep alive timeout. + conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(final Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { + return null; + } + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + // Send some dummy data (populate content length details) + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100000; ++i) { + header.write(dob); + } + + long contentLength = dob.getLength(); + // for testing purpose; + // disable connectinKeepAliveEnabled if keepAliveParam is available + if (keepAliveParam) { + connectionKeepAliveEnabled = false; + } + + super.setResponseHeaders(response, keepAliveParam, contentLength); + } + + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + + // send a shuffle header and a lot of data down the channel + // to trigger a broken pipe + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100000; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + + @Override + protected void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + }; + } + }; + shuffleHandler.init(conf); + shuffleHandler.start(); + + String shuffleBaseURL = "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); + URL url = + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + + "map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + DataInputStream input = new DataInputStream(conn.getInputStream()); + Assert.assertEquals(HttpHeaders.KEEP_ALIVE, + conn.getHeaderField(HttpHeaders.CONNECTION)); + Assert.assertEquals("timeout=1", + conn.getHeaderField(HttpHeaders.KEEP_ALIVE)); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + ShuffleHeader header = new ShuffleHeader(); + header.readFields(input); + input.close(); + + // For keepAlive via URL + url = + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + + "map=attempt_12345_1_m_1_0&keepAlive=true"); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + input = new DataInputStream(conn.getInputStream()); + Assert.assertEquals(HttpHeaders.KEEP_ALIVE, + conn.getHeaderField(HttpHeaders.CONNECTION)); + Assert.assertEquals("timeout=1", + conn.getHeaderField(HttpHeaders.KEEP_ALIVE)); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + header = new ShuffleHeader(); + header.readFields(input); + input.close(); + } + + @Test + public void testSocketKeepAlive() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); + // try setting to -ve keep alive timeout. + conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); + HttpURLConnection conn = null; + MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); + try { + shuffleHandler.init(conf); + shuffleHandler.start(); + + String shuffleBaseURL = "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); + URL url = + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + + "map=attempt_12345_1_m_1_0"); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + conn.getInputStream(); + Assert.assertTrue("socket should be set KEEP_ALIVE", + shuffleHandler.isSocketKeepAlive()); + } finally { + if (conn != null) { + conn.disconnect(); + } + shuffleHandler.stop(); + } + } + + /** + * simulate a reducer that sends an invalid shuffle-header - sometimes a wrong + * header_name and sometimes a wrong version + * + * @throws Exception exception + */ + @Test (timeout = 10000) + public void testIncompatibleShuffleVersion() throws Exception { + final int failureNum = 3; + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + ShuffleHandler shuffleHandler = new ShuffleHandler(); + shuffleHandler.init(conf); + shuffleHandler.start(); + + // simulate a reducer that closes early by reading a single shuffle header + // then closing the connection + URL url = new URL("http://127.0.0.1:" + + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); + for (int i = 0; i < failureNum; ++i) { + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + i == 0 ? "mapreduce" : "other"); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + i == 1 ? "1.0.0" : "1.0.1"); + conn.connect(); + Assert.assertEquals( + HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); + } + + shuffleHandler.stop(); + shuffleHandler.close(); + } + + /** + * Validate the limit on number of shuffle connections. + * + * @throws Exception exception + */ + @Test (timeout = 10000) + public void testMaxConnections() throws Exception { + + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { + // Do nothing. + return null; + } + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + // Do nothing. + } + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + // Do nothing. + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) + throws IOException { + // send a shuffle header and a lot of data down the channel + // to trigger a broken pipe + ShuffleHeader header = + new ShuffleHeader("dummy_header", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i=0; i<100000; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + }; + } + }; + shuffleHandler.init(conf); + shuffleHandler.start(); + + // setup connections + int connAttempts = 3; + HttpURLConnection conns[] = new HttpURLConnection[connAttempts]; + + for (int i = 0; i < connAttempts; i++) { + String URLstring = "http://127.0.0.1:" + + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_" + + i + "_0"; + URL url = new URL(URLstring); + conns[i] = (HttpURLConnection)url.openConnection(); + conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + } + + // Try to open numerous connections + for (int i = 0; i < connAttempts; i++) { + conns[i].connect(); + } + + //Ensure first connections are okay + conns[0].getInputStream(); + int rc = conns[0].getResponseCode(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + conns[1].getInputStream(); + rc = conns[1].getResponseCode(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // This connection should be closed because it to above the limit + try { + conns[2].getInputStream(); + rc = conns[2].getResponseCode(); + Assert.fail("Expected a SocketException"); + } catch (SocketException se) { + LOG.info("Expected - connection should not be open"); + } catch (Exception e) { + Assert.fail("Expected a SocketException"); + } + + shuffleHandler.stop(); + } + + /** + * Validate the ownership of the map-output files being pulled in. The + * local-file-system owner of the file should match the user component in the + * + * @throws Exception exception + */ + @Test(timeout = 100000) + public void testMapFileAccess() throws IOException { + // This will run only in NativeIO is enabled as SecureIOUtils need it + assumeTrue(NativeIO.isAvailable()); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + LOG.info(appId.toString()); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + String reducerId = "0"; + List<File> fileMap = new ArrayList<File>(); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + // Do nothing. + } + + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token<JobTokenIdentifier> jt = + new Token<JobTokenIdentifier>("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + + "&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + byte[] byteArr = new byte[10000]; + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + is.readFully(byteArr); + } catch (EOFException e) { + // ignore + } + // Retrieve file owner name + FileInputStream is = new FileInputStream(fileMap.get(0)); + String owner = NativeIO.POSIX.getFstat(is.getFD()).getOwner(); + is.close(); + + String message = + "Owner '" + owner + "' for path " + fileMap.get(0).getAbsolutePath() + + " did not match expected owner '" + user + "'"; + Assert.assertTrue((new String(byteArr)).contains(message)); + } finally { + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); + } + } + + private static void createShuffleHandlerFiles(File logDir, String user, + String appId, String appAttemptId, Configuration conf, + List<File> fileMap) throws IOException { + String attemptDir = + StringUtils.join(Path.SEPARATOR, + new String[] { logDir.getAbsolutePath(), + ShuffleHandler.USERCACHE, user, + ShuffleHandler.APPCACHE, appId, "output", appAttemptId }); + File appAttemptDir = new File(attemptDir); + appAttemptDir.mkdirs(); + System.out.println(appAttemptDir.getAbsolutePath()); + File indexFile = new File(appAttemptDir, "file.out.index"); + fileMap.add(indexFile); + createIndexFile(indexFile, conf); + File mapOutputFile = new File(appAttemptDir, "file.out"); + fileMap.add(mapOutputFile); + createMapOutputFile(mapOutputFile, conf); + } + + private static void + createMapOutputFile(File mapOutputFile, Configuration conf) + throws IOException { + FileOutputStream out = new FileOutputStream(mapOutputFile); + out.write("Creating new dummy map output file. Used only for testing" + .getBytes()); + out.flush(); + out.close(); + } + + private static void createIndexFile(File indexFile, Configuration conf) + throws IOException { + if (indexFile.exists()) { + System.out.println("Deleting existing file"); + indexFile.delete(); + } + indexFile.createNewFile(); + FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append( + new Path(indexFile.getAbsolutePath())); + Checksum crc = new PureJavaCrc32(); + crc.reset(); + CheckedOutputStream chk = new CheckedOutputStream(output, crc); + String msg = "Writing new index file. This file will be used only " + + "for the testing."; + chk.write(Arrays.copyOf(msg.getBytes(), + MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH)); + output.writeLong(chk.getChecksum().getValue()); + output.close(); + } + + @Test + public void testRecovery() throws IOException { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(12345, 1); + final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId)); + final File tmpDir = new File(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestShuffleHandler.class.getName()); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + ShuffleHandler shuffle = new ShuffleHandler(); + // emulate aux services startup with recovery enabled + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + tmpDir.mkdirs(); + try { + shuffle.init(conf); + shuffle.start(); + + // setup a shuffle token for an application + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>( + "identifier".getBytes(), "password".getBytes(), new Text(user), + new Text("shuffleService")); + jt.write(outputBuffer); + shuffle.initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + + // verify we are authorized to shuffle + int rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // emulate shuffle handler restart + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + + // verify we are still authorized to shuffle to the old application + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // shutdown app and verify access is lost + shuffle.stopApplication(new ApplicationTerminationContext(appId)); + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc); + + // emulate shuffle handler restart + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + + // verify we still don't have access + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc); + } finally { + if (shuffle != null) { + shuffle.close(); + } + FileUtil.fullyDelete(tmpDir); + } + } + + @Test + public void testRecoveryFromOtherVersions() throws IOException { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(12345, 1); + final File tmpDir = new File(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestShuffleHandler.class.getName()); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + ShuffleHandler shuffle = new ShuffleHandler(); + // emulate aux services startup with recovery enabled + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + tmpDir.mkdirs(); + try { + shuffle.init(conf); + shuffle.start(); + + // setup a shuffle token for an application + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>( + "identifier".getBytes(), "password".getBytes(), new Text(user), + new Text("shuffleService")); + jt.write(outputBuffer); + shuffle.initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + + // verify we are authorized to shuffle + int rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // emulate shuffle handler restart + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + + // verify we are still authorized to shuffle to the old application + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + Version version = Version.newInstance(1, 0); + Assert.assertEquals(version, shuffle.getCurrentVersion()); + + // emulate shuffle handler restart with compatible version + Version version11 = Version.newInstance(1, 1); + // update version info before close shuffle + shuffle.storeVersion(version11); + Assert.assertEquals(version11, shuffle.loadVersion()); + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + // shuffle version will be override by CURRENT_VERSION_INFO after restart + // successfully. + Assert.assertEquals(version, shuffle.loadVersion()); + // verify we are still authorized to shuffle to the old application + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // emulate shuffle handler restart with incompatible version + Version version21 = Version.newInstance(2, 1); + shuffle.storeVersion(version21); + Assert.assertEquals(version21, shuffle.loadVersion()); + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + + try { + shuffle.start(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for state DB schema:")); + } + + } finally { + if (shuffle != null) { + shuffle.close(); + } + FileUtil.fullyDelete(tmpDir); + } + } + + private static int getShuffleResponseCode(ShuffleHandler shuffle, + Token<JobTokenIdentifier> jt) throws IOException { + URL url = new URL("http://127.0.0.1:" + + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + String encHash = SecureShuffleUtils.hashFromString( + SecureShuffleUtils.buildMsgFrom(url), + new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(jt.getPassword()))); + conn.addRequestProperty( + SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + int rc = conn.getResponseCode(); + conn.disconnect(); + return rc; + } + + @Test(timeout = 100000) + public void testGetMapOutputInfo() throws Exception { + final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "simple"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", TestShuffleHandler.class. + getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + String reducerId = "0"; + List<File> fileMap = new ArrayList<File>(); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void populateHeaders(List<String> mapIds, + String outputBaseStr, String user, int reduce, + HttpRequest request, HttpResponse response, + boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) + throws IOException { + // Only set response headers and skip everything else + // send some dummy value for content-length + super.setResponseHeaders(response, keepAliveParam, 100); + } + @Override + protected void verifyRequest(String appid, + ChannelHandlerContext ctx, HttpRequest request, + HttpResponse response, URL requestUri) throws IOException { + // Do nothing. + } + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error(message)); + ctx.getChannel().close(); + } + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { + // send a shuffle header + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token<JobTokenIdentifier> jt = + new Token<JobTokenIdentifier>("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + + "&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + ShuffleHeader header = new ShuffleHeader(); + header.readFields(is); + is.close(); + } catch (EOFException e) { + // ignore + } + Assert.assertEquals("sendError called due to shuffle error", + 0, failures.size()); + } finally { + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); + } + } + + @Test(timeout = 4000) + public void testSendMapCount() throws Exception { + final List<ShuffleHandler.ReduceMapFileCount> listenerList = + new ArrayList<ShuffleHandler.ReduceMapFileCount>(); + + final ChannelHandlerContext mockCtx = + mock(ChannelHandlerContext.class); + final MessageEvent mockEvt = mock(MessageEvent.class); + final Channel mockCh = mock(AbstractChannel.class); + + // Mock HttpRequest and ChannelFuture + final HttpRequest mockHttpRequest = createMockHttpRequest(); + final ChannelFuture mockFuture = createMockChannelFuture(mockCh, + listenerList); + + // Mock Netty Channel Context and Channel behavior + Mockito.doReturn(mockCh).when(mockCtx).getChannel(); + when(mockCtx.getChannel()).thenReturn(mockCh); + Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class)); + when(mockCh.write(Object.class)).thenReturn(mockFuture); + + //Mock MessageEvent behavior + Mockito.doReturn(mockCh).when(mockEvt).getChannel(); + when(mockEvt.getChannel()).thenReturn(mockCh); + Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage(); + + final ShuffleHandler sh = new MockShuffleHandler(); + Configuration conf = new Configuration(); + sh.init(conf); + sh.start(); + int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, + ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES); + sh.getShuffle(conf).messageReceived(mockCtx, mockEvt); + assertTrue("Number of Open files should not exceed the configured " + + "value!-Not Expected", + listenerList.size() <= maxOpenFiles); + while(!listenerList.isEmpty()) { + listenerList.remove(0).operationComplete(mockFuture); + assertTrue("Number of Open files should not exceed the configured " + + "value!-Not Expected", + listenerList.size() <= maxOpenFiles); + } + sh.close(); + } + + public ChannelFuture createMockChannelFuture(Channel mockCh, + final List<ShuffleHandler.ReduceMapFileCount> listenerList) { + final ChannelFuture mockFuture = mock(ChannelFuture.class); + when(mockFuture.getChannel()).thenReturn(mockCh); + Mockito.doReturn(true).when(mockFuture).isSuccess(); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + //Add ReduceMapFileCount listener to a list + if (invocation.getArguments()[0].getClass() == + ShuffleHandler.ReduceMapFileCount.class) + listenerList.add((ShuffleHandler.ReduceMapFileCount) + invocation.getArguments()[0]); + return null; + } + }).when(mockFuture).addListener(Mockito.any( + ShuffleHandler.ReduceMapFileCount.class)); + return mockFuture; + } + + public HttpRequest createMockHttpRequest() { + HttpRequest mockHttpRequest = mock(HttpRequest.class); + Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod(); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + String uri = "/mapOutput?job=job_12345_1&reduce=1"; + for (int i = 0; i < 100; i++) + uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0"); + return uri; + } + }).when(mockHttpRequest).getUri(); + return mockHttpRequest; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/077dd88e/tez-plugins/tez-history-parser/pom.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/pom.xml b/tez-plugins/tez-history-parser/pom.xml index e151b07..3a6185a 100644 --- a/tez-plugins/tez-history-parser/pom.xml +++ b/tez-plugins/tez-history-parser/pom.xml @@ -70,11 +70,6 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/tez/blob/077dd88e/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml b/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml index 5dcfb99..c85adca 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml @@ -68,11 +68,6 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> </dependency> <dependency>
