Repository: tajo
Updated Branches:
  refs/heads/master 0f76a8ba0 -> 016507dd6


TAJO-1029: TAJO_PULLSERVER_STANDALONE should be false in default tajo-env.sh. 
(jinho)

Closes #134


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/016507dd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/016507dd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/016507dd

Branch: refs/heads/master
Commit: 016507dd69e85f35134d8b2fe5e4054f28e762f3
Parents: 0f76a8b
Author: jhkim <[email protected]>
Authored: Thu Sep 11 10:41:49 2014 +0900
Committer: jhkim <[email protected]>
Committed: Thu Sep 11 10:41:49 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../java/org/apache/tajo/worker/TajoWorker.java |  2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  1 -
 tajo-dist/src/main/bin/tajo-daemon.sh           |  1 +
 tajo-dist/src/main/conf/tajo-env.sh             |  2 +-
 .../apache/tajo/pullserver/TajoPullServer.java  |  2 +-
 .../tajo/pullserver/TajoPullServerService.java  | 51 ++++++++++----------
 7 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/016507dd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 358b5ed..285e45d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -135,6 +135,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1029: TAJO_PULLSERVER_STANDALONE should be false in default 
tajo-env.sh
+    (jinho)
+
     TAJO-1017: TajoConf misuses read & write locks in some functions. 
     (Mai Hai Thanh via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/016507dd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 8e6118d..584c60e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -207,7 +207,7 @@ public class TajoWorker extends CompositeService {
     addService(tajoWorkerManagerService);
 
     if(!yarnContainerMode) {
-      if(taskRunnerMode && !TajoPullServerService.isStandaloneMode()) {
+      if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
         pullService = new TajoPullServerService();
         addService(pullService);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/016507dd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index b07ba96..346fa69 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -103,7 +103,6 @@ public class TajoTestingCluster {
   }
 
   void initPropertiesAndConfigs() {
-    System.setProperty("TAJO_PULLSERVER_STANDALONE", "false");
     if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
       String testResourceManager = 
System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
       
Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/016507dd/tajo-dist/src/main/bin/tajo-daemon.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/tajo-daemon.sh 
b/tajo-dist/src/main/bin/tajo-daemon.sh
index ff3a764..48790e9 100755
--- a/tajo-dist/src/main/bin/tajo-daemon.sh
+++ b/tajo-dist/src/main/bin/tajo-daemon.sh
@@ -94,6 +94,7 @@ fi
 # some variables
 export TAJO_LOGFILE=tajo-$TAJO_IDENT_STRING-$command-$HOSTNAME.log
 export TAJO_ROOT_LOGGER_APPENDER="${TAJO_ROOT_LOGGER_APPENDER:-DRFA}"
+export TAJO_PULLSERVER_STANDALONE="${TAJO_PULLSERVER_STANDALONE:-false}"
 log=$TAJO_LOG_DIR/tajo-$TAJO_IDENT_STRING-$command-$HOSTNAME.out
 pid=$TAJO_PID_DIR/tajo-$TAJO_IDENT_STRING-$command.pid
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/016507dd/tajo-dist/src/main/conf/tajo-env.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/tajo-env.sh 
b/tajo-dist/src/main/conf/tajo-env.sh
index bd14af6..92d00bd 100755
--- a/tajo-dist/src/main/conf/tajo-env.sh
+++ b/tajo-dist/src/main/conf/tajo-env.sh
@@ -77,4 +77,4 @@ export TAJO_WORKER_STANDBY_MODE=true
 # export HIVE_JDBC_DRIVER_DIR=
 
 # Tajo PullServer mode. the default mode is standalone mode
-export TAJO_PULLSERVER_STANDALONE=true
\ No newline at end of file
+# export TAJO_PULLSERVER_STANDALONE=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/016507dd/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
index 7d7065e..d030eed 100644
--- 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -60,7 +60,7 @@ public class TajoPullServer extends CompositeService {
   public static void main(String[] args) throws Exception {
     StringUtils.startupShutdownMessage(PullServer.class, args, LOG);
 
-    if (!TajoPullServerService.isStandaloneMode()) {
+    if (!TajoPullServerService.isStandalone()) {
       LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");
       return;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/016507dd/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 150ac85..f7bc489 100644
--- 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -20,6 +20,7 @@ package org.apache.tajo.pullserver;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +30,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.mapred.FadvisedChunkedFile;
 import org.apache.hadoop.mapred.FadvisedFileRegion;
@@ -122,6 +124,15 @@ public class TajoPullServerService extends AbstractService 
{
 
   public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
 
+  private static boolean STANDALONE = false;
+
+  static {
+    String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
+    if (!StringUtils.isEmpty(standalone)) {
+      STANDALONE = standalone.equalsIgnoreCase("true");
+    }
+  }
+
   @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", 
context="tajo")
   static class ShuffleMetrics implements ChannelFutureListener {
     @Metric({"OutputBytes","PullServer output in bytes"})
@@ -245,48 +256,41 @@ public class TajoPullServerService extends 
AbstractService {
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
                                     DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
 
-    if (isStandaloneMode()) {
+    if (STANDALONE) {
       File pullServerPortFile = getPullServerPortFile();
       if (pullServerPortFile.exists()) {
         pullServerPortFile.delete();
       }
       pullServerPortFile.getParentFile().mkdirs();
       LOG.info("Write PullServerPort to " + pullServerPortFile);
+      FileOutputStream out = null;
       try {
-        FileOutputStream out = new FileOutputStream(pullServerPortFile);
+        out = new FileOutputStream(pullServerPortFile);
         out.write(("" + port).getBytes());
-        out.close();
       } catch (Exception e) {
         LOG.fatal("PullServer exists cause can't write PullServer port to " + 
pullServerPortFile +
             ", " + e.getMessage(), e);
         System.exit(-1);
+      } finally {
+        IOUtils.closeStream(out);
       }
     }
     LOG.info("TajoPullServerService started: port=" + port);
   }
 
+  public static boolean isStandalone() {
+    return STANDALONE;
+  }
+
   private static File getPullServerPortFile() {
     String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR");
-    if (pullServerPortInfoFile == null || pullServerPortInfoFile.isEmpty()) {
+    if (StringUtils.isEmpty(pullServerPortInfoFile)) {
       pullServerPortInfoFile = "/tmp";
     }
-
     return new File(pullServerPortInfoFile + "/pullserver.port");
   }
 
-  public static boolean isStandaloneMode() {
-    String mode = System.getenv("TAJO_PULLSERVER_STANDALONE");
-    if (mode == null || mode.trim().isEmpty()) {
-      mode = System.getProperty("TAJO_PULLSERVER_STANDALONE");
-    }
-
-    if (mode == null || mode.trim().isEmpty()) {
-      return true;
-    } else {
-      return mode.equalsIgnoreCase("true");
-    }
-  }
-
+  // TODO change to get port from master or tajoConf
   public static int readPullServerPort() {
     FileInputStream in = null;
     try {
@@ -299,16 +303,11 @@ public class TajoPullServerService extends 
AbstractService {
       byte[] buf = new byte[1024];
       int readBytes = in.read(buf);
       return Integer.parseInt(new String(buf, 0, readBytes));
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
+    } catch (IOException e) {
+      LOG.fatal(e.getMessage(), e);
       return -1;
     } finally {
-      if (in != null) {
-        try {
-          in.close();
-        } catch (IOException e) {
-        }
-      }
+      IOUtils.closeStream(in);
     }
   }
 

Reply via email to