Repository: tajo Updated Branches: refs/heads/master 487a0e51a -> 968633ff0
http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 0744b9f..6894cc5 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -19,10 +19,20 @@ package org.apache.tajo.pullserver; import com.google.common.collect.Lists; - +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.channel.*; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.*; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -32,8 +42,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.metrics2.MetricsSystem; @@ -58,18 +66,6 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedWriteHandler; -import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.GlobalEventExecutor; - import java.io.*; import java.net.InetSocketAddress; import java.net.URI; @@ -79,7 +75,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public class TajoPullServerService extends AbstractService { @@ -122,7 +118,14 @@ public class TajoPullServerService extends AbstractService { private static boolean STANDALONE = false; + private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER; + private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER; + static { + /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */ + SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile"); + REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles"); + String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE"); if (!StringUtils.isEmpty(standalone)) { STANDALONE = standalone.equalsIgnoreCase("true"); @@ -163,30 +166,6 @@ public class TajoPullServerService extends AbstractService { this(DefaultMetricsSystem.instance()); } - /** - * Serialize the shuffle port into a ByteBuffer for use later on. - * @param port the port to be sent to the ApplciationMaster - * @return the serialized form of the port. - */ - public static ByteBuffer serializeMetaData(int port) throws IOException { - //TODO these bytes should be versioned - DataOutputBuffer port_dob = new DataOutputBuffer(); - port_dob.writeInt(port); - return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); - } - - /** - * A helper function to deserialize the metadata returned by PullServerAuxService. - * @param meta the metadata returned by the PullServerAuxService - * @return the port the PullServer Handler is listening on to serve shuffle data. - */ - public static int deserializeMetaData(ByteBuffer meta) throws IOException { - //TODO this should be returning a class not just an int - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(meta); - return in.readInt(); - } - public void initApp(String user, ApplicationId appId, ByteBuffer secret) { // TODO these bytes should be versioned // TODO: Once SHuffle is out of NM, this can use MR APIs @@ -211,7 +190,7 @@ public class TajoPullServerService extends AbstractService { int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 2); - selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum) + selector = RpcChannelFactory.createServerChannelFactory("TajoPullServerService", workerNum) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.TCP_NODELAY, true); @@ -229,11 +208,15 @@ public class TajoPullServerService extends AbstractService { // TODO change AbstractService to throw InterruptedException @Override - public synchronized void serviceInit(Configuration conf) throws Exception { - ServerBootstrap bootstrap = selector.clone(); + public void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + ServerBootstrap bootstrap = selector.clone(); + TajoConf tajoConf = (TajoConf)conf; try { - channelInitializer = new HttpChannelInitializer(conf); + channelInitializer = new HttpChannelInitializer(tajoConf); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -249,7 +232,6 @@ public class TajoPullServerService extends AbstractService { accepted.add(future.channel()); port = ((InetSocketAddress)future.channel().localAddress()).getPort(); conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); - channelInitializer.PullServer.setPort(port); LOG.info(getName() + " listening on port " + port); sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, @@ -317,7 +299,7 @@ public class TajoPullServerService extends AbstractService { } @Override - public synchronized void stop() { + public void stop() { try { accepted.close(); if (selector != null) { @@ -341,22 +323,12 @@ public class TajoPullServerService extends AbstractService { } } - public synchronized ByteBuffer getMeta() { - try { - return serializeMetaData(port); - } catch (IOException e) { - LOG.error("Error during getMeta", e); - // TODO add API to AuxiliaryServices to report failures - return null; - } - } - class HttpChannelInitializer extends ChannelInitializer<SocketChannel> { final PullServer PullServer; private SSLFactory sslFactory; - public HttpChannelInitializer(Configuration conf) throws Exception { + public HttpChannelInitializer(TajoConf conf) throws Exception { PullServer = new PullServer(conf); if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) { @@ -405,12 +377,12 @@ public class TajoPullServerService extends AbstractService { class ProcessingStatus { String requestUri; int numFiles; - AtomicInteger remainFiles; long startTime; long makeFileListTime; long minTime = Long.MAX_VALUE; long maxTime; - int numSlowFile; + volatile int numSlowFile; + volatile int remainFiles; public ProcessingStatus(String requestUri) { this.requestUri = requestUri; @@ -419,14 +391,14 @@ public class TajoPullServerService extends AbstractService { public void setNumFiles(int numFiles) { this.numFiles = numFiles; - this.remainFiles = new AtomicInteger(numFiles); + this.remainFiles = numFiles; } - public synchronized void decrementRemainFiles(FileRegion filePart, long fileStartTime) { + public void decrementRemainFiles(FileRegion filePart, long fileStartTime) { long fileSendTime = System.currentTimeMillis() - fileStartTime; if (fileSendTime > 20 * 1000) { LOG.info("PullServer send too long time: filePos=" + filePart.position() + ", fileLen=" + filePart.count()); - numSlowFile++; + SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile+ 1); } if (fileSendTime > maxTime) { maxTime = fileSendTime; @@ -434,8 +406,9 @@ public class TajoPullServerService extends AbstractService { if (fileSendTime < minTime) { minTime = fileSendTime; } - int remain = remainFiles.decrementAndGet(); - if (remain <= 0) { + + REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1); + if (REMAIN_FILE_UPDATER.get(this) <= 0) { processingStatusMap.remove(requestUri); LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " @@ -447,25 +420,17 @@ public class TajoPullServerService extends AbstractService { @ChannelHandler.Sharable class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> { - private final Configuration conf; + private final TajoConf conf; // private final IndexCache indexCache; private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - private int port; - public PullServer(Configuration conf) throws IOException { + public PullServer(TajoConf conf) throws IOException { this.conf = conf; -// indexCache = new IndexCache(new JobConf(conf)); - this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, - ConfVars.PULLSERVER_PORT.defaultIntVal); // init local temporal dir lDirAlloc.getAllLocalPathsToRead(".", conf); } - - public void setPort(int port) { - this.port = port; - } private List<String> splitMaps(List<String> mapq) { if (null == mapq) { @@ -567,7 +532,7 @@ public class TajoPullServerService extends AbstractService { // if a stage requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { - int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf); String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; if (!lDirAlloc.ifExists(partPath, conf)) { LOG.warn("Partition shuffle file not exists: " + partPath); http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java index e26bcd6..c4091f1 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java @@ -40,7 +40,7 @@ public class DirectoryRetriever implements DataRetriever { throws IOException { final String path = HttpDataServerHandler.sanitizeUri(request.getUri()); if (path == null) { - throw new IllegalArgumentException("Wrong path: " +path); + throw new IllegalArgumentException("Wrong uri: " +request.getUri()); } File file = new File(baseDir, path);
