http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 0744b9f..6894cc5 100644
--- 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -19,10 +19,20 @@
 package org.apache.tajo.pullserver;
 
 import com.google.common.collect.Lists;
-
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.*;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -32,8 +42,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.metrics2.MetricsSystem;
@@ -58,18 +66,6 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
 import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -79,7 +75,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public class TajoPullServerService extends AbstractService {
 
@@ -122,7 +118,14 @@ public class TajoPullServerService extends AbstractService 
{
 
   private static boolean STANDALONE = false;
 
+  private static final AtomicIntegerFieldUpdater<ProcessingStatus> 
SLOW_FILE_UPDATER;
+  private static final AtomicIntegerFieldUpdater<ProcessingStatus> 
REMAIN_FILE_UPDATER;
+
   static {
+    /* AtomicIntegerFieldUpdater can save the memory usage instead of 
AtomicInteger instance */
+    SLOW_FILE_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
+    REMAIN_FILE_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
+
     String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
     if (!StringUtils.isEmpty(standalone)) {
       STANDALONE = standalone.equalsIgnoreCase("true");
@@ -163,30 +166,6 @@ public class TajoPullServerService extends AbstractService 
{
     this(DefaultMetricsSystem.instance());
   }
 
-  /**
-   * Serialize the shuffle port into a ByteBuffer for use later on.
-   * @param port the port to be sent to the ApplciationMaster
-   * @return the serialized form of the port.
-   */
-  public static ByteBuffer serializeMetaData(int port) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
-  }
-
-  /**
-   * A helper function to deserialize the metadata returned by 
PullServerAuxService.
-   * @param meta the metadata returned by the PullServerAuxService
-   * @return the port the PullServer Handler is listening on to serve shuffle 
data.
-   */
-  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
-    //TODO this should be returning a class not just an int
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(meta);
-    return in.readInt();
-  }
-
   public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
     // TODO these bytes should be versioned
     // TODO: Once SHuffle is out of NM, this can use MR APIs
@@ -211,7 +190,7 @@ public class TajoPullServerService extends AbstractService {
       int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
           Runtime.getRuntime().availableProcessors() * 2);
 
-      selector = 
RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum)
+      selector = 
RpcChannelFactory.createServerChannelFactory("TajoPullServerService", workerNum)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.TCP_NODELAY, true);
@@ -229,11 +208,15 @@ public class TajoPullServerService extends 
AbstractService {
 
   // TODO change AbstractService to throw InterruptedException
   @Override
-  public synchronized void serviceInit(Configuration conf) throws Exception {
-    ServerBootstrap bootstrap = selector.clone();
+  public void serviceInit(Configuration conf) throws Exception {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf 
instance");
+    }
 
+    ServerBootstrap bootstrap = selector.clone();
+    TajoConf tajoConf = (TajoConf)conf;
     try {
-      channelInitializer = new HttpChannelInitializer(conf);
+      channelInitializer = new HttpChannelInitializer(tajoConf);
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
@@ -249,7 +232,6 @@ public class TajoPullServerService extends AbstractService {
     accepted.add(future.channel());
     port = ((InetSocketAddress)future.channel().localAddress()).getPort();
     conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
-    channelInitializer.PullServer.setPort(port);
     LOG.info(getName() + " listening on port " + port);
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
@@ -317,7 +299,7 @@ public class TajoPullServerService extends AbstractService {
   }
 
   @Override
-  public synchronized void stop() {
+  public void stop() {
     try {
       accepted.close();
       if (selector != null) {
@@ -341,22 +323,12 @@ public class TajoPullServerService extends 
AbstractService {
     }
   }
 
-  public synchronized ByteBuffer getMeta() {
-    try {
-      return serializeMetaData(port); 
-    } catch (IOException e) {
-      LOG.error("Error during getMeta", e);
-      // TODO add API to AuxiliaryServices to report failures
-      return null;
-    }
-  }
-
   class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
 
     final PullServer PullServer;
     private SSLFactory sslFactory;
 
-    public HttpChannelInitializer(Configuration conf) throws Exception {
+    public HttpChannelInitializer(TajoConf conf) throws Exception {
       PullServer = new PullServer(conf);
       if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
           ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
@@ -405,12 +377,12 @@ public class TajoPullServerService extends 
AbstractService {
   class ProcessingStatus {
     String requestUri;
     int numFiles;
-    AtomicInteger remainFiles;
     long startTime;
     long makeFileListTime;
     long minTime = Long.MAX_VALUE;
     long maxTime;
-    int numSlowFile;
+    volatile int numSlowFile;
+    volatile int remainFiles;
 
     public ProcessingStatus(String requestUri) {
       this.requestUri = requestUri;
@@ -419,14 +391,14 @@ public class TajoPullServerService extends 
AbstractService {
 
     public void setNumFiles(int numFiles) {
       this.numFiles = numFiles;
-      this.remainFiles = new AtomicInteger(numFiles);
+      this.remainFiles = numFiles;
     }
 
-    public synchronized void decrementRemainFiles(FileRegion filePart, long 
fileStartTime) {
+    public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
       long fileSendTime = System.currentTimeMillis() - fileStartTime;
       if (fileSendTime > 20 * 1000) {
         LOG.info("PullServer send too long time: filePos=" + 
filePart.position() + ", fileLen=" + filePart.count());
-        numSlowFile++;
+         SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile+ 1);
       }
       if (fileSendTime > maxTime) {
         maxTime = fileSendTime;
@@ -434,8 +406,9 @@ public class TajoPullServerService extends AbstractService {
       if (fileSendTime < minTime) {
         minTime = fileSendTime;
       }
-      int remain = remainFiles.decrementAndGet();
-      if (remain <= 0) {
+
+      REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1);
+      if (REMAIN_FILE_UPDATER.get(this) <= 0) {
         processingStatusMap.remove(requestUri);
         LOG.info("PullServer processing status: totalTime=" + 
(System.currentTimeMillis() - startTime) + " ms, "
             + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + 
minTime + " ms, maxTime=" + maxTime + " ms, "
@@ -447,25 +420,17 @@ public class TajoPullServerService extends 
AbstractService {
   @ChannelHandler.Sharable
   class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
 
-    private final Configuration conf;
+    private final TajoConf conf;
 //    private final IndexCache indexCache;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-    private int port;
 
-    public PullServer(Configuration conf) throws IOException {
+    public PullServer(TajoConf conf) throws IOException {
       this.conf = conf;
-//      indexCache = new IndexCache(new JobConf(conf));
-      this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
-          ConfVars.PULLSERVER_PORT.defaultIntVal);
 
       // init local temporal dir
       lDirAlloc.getAllLocalPathsToRead(".", conf);
     }
-    
-    public void setPort(int port) {
-      this.port = port;
-    }
 
     private List<String> splitMaps(List<String> mapq) {
       if (null == mapq) {
@@ -567,7 +532,7 @@ public class TajoPullServerService extends AbstractService {
 
         // if a stage requires a hash shuffle or a scattered hash shuffle
       } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-        int partParentId = 
HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) 
conf);
+        int partParentId = 
HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
         String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + 
partParentId + "/" + partId;
         if (!lDirAlloc.ifExists(partPath, conf)) {
           LOG.warn("Partition shuffle file not exists: " + partPath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
index e26bcd6..c4091f1 100644
--- 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
+++ 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
@@ -40,7 +40,7 @@ public class DirectoryRetriever implements DataRetriever {
       throws IOException {
     final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
     if (path == null) {
-      throw new IllegalArgumentException("Wrong path: " +path);
+      throw new IllegalArgumentException("Wrong uri: " +request.getUri());
     }
 
     File file = new File(baseDir, path);

Reply via email to