http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index fd52488..0d1924b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -129,6 +129,9 @@ public class QueryMasterTask extends CompositeService { @Override public void init(Configuration conf) { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("conf should be a TajoConf type."); + } systemConf = (TajoConf)conf; try { @@ -377,8 +380,7 @@ public class QueryMasterTask extends CompositeService { } private void initStagingDir() throws IOException { - Path stagingDir = null; - FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); + Path stagingDir; try { @@ -388,14 +390,7 @@ public class QueryMasterTask extends CompositeService { LOG.info("The staging dir '" + stagingDir + "' is created."); queryContext.setStagingDir(stagingDir); } catch (IOException ioe) { - if (stagingDir != null && defaultFS.exists(stagingDir)) { - try { - defaultFS.delete(stagingDir, true); - LOG.info("The staging directory '" + stagingDir + "' is deleted"); - } catch (Exception e) { - LOG.warn(e.getMessage()); - } - } + LOG.warn("Creating staging dir has been failed.", ioe); throw ioe; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 7c2d9f4..8e9e343 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -82,7 +82,6 @@ public class Repartitioner { public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage) throws IOException { - MasterPlan masterPlan = stage.getMasterPlan(); ExecutionBlock execBlock = stage.getBlock(); QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext(); @@ -231,12 +230,13 @@ public class Repartitioner { String namePrefix = ""; long maxStats = Long.MIN_VALUE; int maxStatsScanIdx = -1; + StringBuilder nonLeafScanNamesBuilder = new StringBuilder(); for (int i = 0; i < scans.length; i++) { if (scans[i].getTableDesc().getMeta().getStoreType() == StoreType.RAW) { // Intermediate data scan hasNonLeafNode = true; largeScanIndexList.add(i); - nonLeafScanNames += namePrefix + scans[i].getCanonicalName(); + nonLeafScanNamesBuilder.append(namePrefix).append(scans[i].getCanonicalName()); namePrefix = ","; } if (execBlock.isBroadcastTable(scans[i].getCanonicalName())) { @@ -249,18 +249,19 @@ public class Repartitioner { } } } + nonLeafScanNames = nonLeafScanNamesBuilder.toString(); if (maxStatsScanIdx == -1) { maxStatsScanIdx = 0; } if (!hasNonLeafNode) { if (largeScanIndexList.size() > 1) { - String largeTableNames = ""; + StringBuilder largeTableNamesBuilder = new StringBuilder(); for (Integer eachId : largeScanIndexList) { - largeTableNames += scans[eachId].getTableName() + ","; + largeTableNamesBuilder.append(scans[eachId].getTableName()).append(','); } throw new IOException("Broadcast join with leaf node should have only one large table, " + - "but " + largeScanIndexList.size() + ", tables=" + largeTableNames); + "but " + largeScanIndexList.size() + ", tables=" + largeTableNamesBuilder.toString()); } int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0); LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d", @@ -494,6 +495,9 @@ public class Repartitioner { public static List<Fragment> getFragmentsFromPartitionedTable(FileStorageManager sm, ScanNode scan, TableDesc table) throws IOException { + if (!(scan instanceof PartitionedTableScanNode)) { + throw new IllegalArgumentException("scan should be a PartitionedTableScanNode type."); + } List<Fragment> fragments = Lists.newArrayList(); PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan; fragments.addAll(sm.getSplits( @@ -696,9 +700,14 @@ public class Repartitioner { LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + " sub ranges (total units: " + determinedTaskNum + ")"); ranges = partitioner.partition(determinedTaskNum); - if (ranges == null || ranges.length == 0) { + if (ranges == null) { + throw new NullPointerException("ranges is null on " + stage.getId() + " stage."); + } + + if (ranges.length == 0) { LOG.warn(stage.getId() + " no range infos."); } + TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); if (LOG.isDebugEnabled()) { if (ranges != null) { @@ -985,7 +994,7 @@ public class Repartitioner { String tableName) { long splitVolume = StorageUnit.MB * stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); - long pageSize = StorageUnit.MB * + long pageSize = ((long)StorageUnit.MB) * stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes if (pageSize >= splitVolume) { throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " + http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index efadaa7..4e1f716 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -865,7 +865,6 @@ public class Stage implements EventHandler<StageEvent> { * @return */ public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) { - TajoConf conf = stage.context.getConf(); MasterPlan masterPlan = stage.getMasterPlan(); ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); @@ -1156,6 +1155,9 @@ public class Stage implements EventHandler<StageEvent> { @Override public void transition(Stage stage, StageEvent event) { + if (!(event instanceof StageContainerAllocationEvent)) { + throw new IllegalArgumentException("event should be a StageContainerAllocationEvent type."); + } try { StageContainerAllocationEvent allocationEvent = (StageContainerAllocationEvent) event; @@ -1191,6 +1193,9 @@ public class Stage implements EventHandler<StageEvent> { private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> { @Override public void transition(Stage stage, StageEvent event) { + if (!(event instanceof StageContainerAllocationEvent)) { + throw new IllegalArgumentException("event should be a StageContainerAllocationEvent type."); + } try { StageContainerAllocationEvent allocationEvent = (StageContainerAllocationEvent) event; @@ -1213,6 +1218,9 @@ public class Stage implements EventHandler<StageEvent> { @Override public void transition(Stage stage, StageEvent event) { + if (!(event instanceof StageTaskEvent)) { + throw new IllegalArgumentException("event should be a StageTaskEvent type."); + } StageTaskEvent taskEvent = (StageTaskEvent) event; Task task = stage.getTask(taskEvent.getTaskId()); @@ -1418,6 +1426,9 @@ public class Stage implements EventHandler<StageEvent> { private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> { @Override public void transition(Stage stage, StageEvent event) { + if (!(event instanceof StageDiagnosticsUpdateEvent)) { + throw new IllegalArgumentException("event should be a StageDiagnosticsUpdateEvent type."); + } stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index ad01b62..92f4b20 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -601,6 +601,9 @@ public class Task implements EventHandler<TaskEvent> { @Override public void transition(Task task, TaskEvent event) { + if (!(event instanceof TaskTAttemptEvent)) { + throw new IllegalArgumentException("event should be a TaskTAttemptEvent type."); + } TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); @@ -618,6 +621,9 @@ public class Task implements EventHandler<TaskEvent> { @Override public void transition(Task task, TaskEvent event) { + if (!(event instanceof TaskTAttemptEvent)) { + throw new IllegalArgumentException("event should be a TaskTAttemptEvent type."); + } TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); task.launchTime = System.currentTimeMillis(); @@ -629,6 +635,9 @@ public class Task implements EventHandler<TaskEvent> { private static class AttemptFailedTransition implements SingleArcTransition<Task, TaskEvent> { @Override public void transition(Task task, TaskEvent event) { + if (!(event instanceof TaskTAttemptEvent)) { + throw new IllegalArgumentException("event should be a TaskTAttemptEvent type."); + } TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; LOG.info("============================================================="); LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<"); @@ -646,6 +655,9 @@ public class Task implements EventHandler<TaskEvent> { @Override public TaskState transition(Task task, TaskEvent taskEvent) { + if (!(taskEvent instanceof TaskTAttemptEvent)) { + throw new IllegalArgumentException("taskEvent should be a TaskTAttemptEvent type."); + } TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent; task.failedAttempts++; task.finishedAttempts++; http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index 86c49b4..c1b9273 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -307,6 +307,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { @Override public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + if (!(event instanceof TaskAttemptAssignedEvent)) { + throw new IllegalArgumentException("event should be a TaskAttemptAssignedEvent type."); + } TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; taskAttempt.containerId = castEvent.getContainerId(); taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); @@ -333,6 +336,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { @Override public TaskAttemptState transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + if (!(event instanceof TaskAttemptStatusUpdateEvent)) { + throw new IllegalArgumentException("event should be a TaskAttemptStatusUpdateEvent type."); + } TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event; taskAttempt.progress = updateEvent.getStatus().getProgress(); @@ -371,6 +377,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { @Override public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + if (!(event instanceof TaskCompletionEvent)) { + throw new IllegalArgumentException("event should be a TaskCompletionEvent type."); + } TaskCompletionReport report = ((TaskCompletionEvent)event).getReport(); try { @@ -395,6 +404,9 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ @Override public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + if (!(event instanceof TaskFatalErrorEvent)) { + throw new IllegalArgumentException("event should be a TaskFatalErrorEvent type."); + } TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java index 3147bb6..23d245b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java @@ -143,6 +143,9 @@ public class IndexUtil { && binaryEval.getRightExpr().getType() == EvalType.CONST) { nodeList.add(binaryEval); } + break; + default: + break; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 13f4dcc..875d12b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -97,6 +97,9 @@ public class JSPUtil { } public static String getTajoMasterHttpAddr(Configuration config) { + if (!(config instanceof TajoConf)) { + throw new IllegalArgumentException("config should be a TajoConf type."); + } try { TajoConf conf = (TajoConf) config; String [] masterAddr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":"); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java index 868dfcd..6ba74d5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryCleaner.java @@ -127,6 +127,8 @@ public class HistoryCleaner extends Thread { } } } + } catch (RuntimeException e) { + LOG.error(e.getMessage(), e); } catch (Exception e) { LOG.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index 3fea3ef..b06c7e8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -84,6 +84,9 @@ public class HistoryWriter extends AbstractService { @Override public void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("conf should be a TajoConf type."); + } tajoConf = (TajoConf)conf; historyParentPath = tajoConf.getQueryHistoryDir(tajoConf); taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java index 9e895b8..386fb79 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java @@ -21,9 +21,7 @@ package org.apache.tajo.util.metrics.reporter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; +import java.io.*; public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter { private static final Log LOG = LogFactory.getLog(MetricsFileScheduledReporter.class); @@ -40,6 +38,7 @@ public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter LOG.warn("No " + metricsPropertyKey + "filename property in tajo-metrics.properties"); return; } + try { File file = new File(fileName); File parentFile = file.getParentFile(); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java index 4fbefd7..7f33792 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java @@ -36,7 +36,6 @@ public abstract class MetricsStreamScheduledReporter extends TajoMetricsSchedule protected Locale locale; protected Clock clock; protected TimeZone timeZone; - protected MetricFilter filter; protected DateFormat dateFormat; private final byte[] NEW_LINE = "\n".getBytes(); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java index a32a913..a7e0559 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java @@ -48,10 +48,12 @@ public abstract class TajoMetricsReporter { groupName = keyTokens[0] + "." + keyTokens[1]; itemName = ""; String prefix = ""; + StringBuilder itemNameBuilder = new StringBuilder(); for (int i = 2; i < keyTokens.length; i++) { - itemName += prefix + keyTokens[i]; + itemNameBuilder.append(prefix).append(keyTokens[i]); prefix = "."; } + itemName = itemNameBuilder.toString(); } else { groupName = ""; itemName = key; http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java index f11d520..7e0ec4a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java @@ -142,11 +142,7 @@ public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter i try { report(); } catch (Exception e) { - if(LOG.isDebugEnabled()) { - LOG.warn("Metric report error:" + e.getMessage(), e); - } else { - LOG.warn("Metric report error:" + e.getMessage(), e); - } + LOG.warn("Metric report error:" + e.getMessage(), e); } } }, period, period, unit); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index d8bb8f1..e9b6230 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -28,6 +28,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.io.NotSerializableException; import java.io.OutputStream; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -62,17 +63,26 @@ import java.util.concurrent.atomic.AtomicInteger; public class QueryExecutorServlet extends HttpServlet { private static final Log LOG = LogFactory.getLog(QueryExecutorServlet.class); + private static final long serialVersionUID = -1517586415463171579L; - ObjectMapper om = new ObjectMapper(); + transient ObjectMapper om = new ObjectMapper(); //queryRunnerId -> QueryRunner //TODO We must handle the session. - private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>(); + private transient final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>(); - private TajoConf tajoConf; - private TajoClient tajoClient; + private transient TajoConf tajoConf; + private transient TajoClient tajoClient; - private ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5); + private transient ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5); + + private void writeObject(java.io.ObjectOutputStream stream) throws java.io.IOException { + throw new NotSerializableException( getClass().getName() ); + } + + private void readObject(java.io.ObjectInputStream stream) throws java.io.IOException, ClassNotFoundException { + throw new NotSerializableException( getClass().getName() ); + } @Override public void init(ServletConfig config) throws ServletException { @@ -135,10 +145,11 @@ public class QueryExecutorServlet extends HttpServlet { if(!queryRunners.containsKey(queryRunnerId)) { break; } - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { } } String database = request.getParameter("database"); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java index 09426e0..6008aae 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java @@ -31,6 +31,8 @@ import java.net.Inet4Address; public class StaticHttpServer extends HttpServer { private static StaticHttpServer instance = null; + private static final Object lockObjectForStaticHttpServer = new Object(); + private StaticHttpServer(Object containerObject , String name, String bindAddress, int port, boolean findPort, Connector connector, Configuration conf, String[] pathSpecs) throws IOException { @@ -52,13 +54,17 @@ public class StaticHttpServer extends HttpServer { addr = Inet4Address.getLocalHost().getHostName(); } } - - instance = new StaticHttpServer(containerObject, name, addr, port, - findPort, connector, conf, pathSpecs); - instance.setAttribute("tajo.info.server.object", containerObject); - instance.setAttribute("tajo.info.server.addr", addr); - instance.setAttribute("tajo.info.server.conf", conf); - instance.setAttribute("tajo.info.server.starttime", System.currentTimeMillis()); + + synchronized (lockObjectForStaticHttpServer) { + if (instance == null) { + instance = new StaticHttpServer(containerObject, name, addr, port, + findPort, connector, conf, pathSpecs); + instance.setAttribute("tajo.info.server.object", containerObject); + instance.setAttribute("tajo.info.server.addr", addr); + instance.setAttribute("tajo.info.server.conf", conf); + instance.setAttribute("tajo.info.server.starttime", System.currentTimeMillis()); + } + } } return instance; } http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java index 4b5a203..67114a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ExecutionBlockSharedResource { private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class); private AtomicBoolean initializing = new AtomicBoolean(false); - private volatile Boolean resourceInitSuccess = new Boolean(false); + private volatile Boolean resourceInitSuccess = Boolean.valueOf(false); // Query private QueryContext context; http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index c6a06f0..827c860 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -106,6 +106,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { @Override public void init(Configuration conf) { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("conf should be a TajoConf type."); + } tajoConf = (TajoConf)conf; queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher()); @@ -147,6 +150,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { @Override public void handle(TaskRunnerGroupEvent event) { if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) { + if (!(event instanceof LaunchTaskRunnersEvent)) { + throw new IllegalArgumentException("event should be a LaunchTaskRunnersEvent type."); + } LaunchTaskRunnersEvent launchEvent = (LaunchTaskRunnersEvent) event; launchTaskRunners(launchEvent); } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) { http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 7f73916..7e2a233 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -186,6 +186,9 @@ public class TajoWorker extends CompositeService { @Override public void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("conf should be a TajoConf type."); + } Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); this.systemConf = (TajoConf)conf; http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 23efffa..cf50767 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -113,6 +113,9 @@ public class TaskRunner extends AbstractService { @Override public void init(Configuration conf) { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("conf should be a TajoConf Type."); + } this.systemConf = (TajoConf)conf; try { @@ -143,15 +146,17 @@ public class TaskRunner extends AbstractService { this.finishTime = System.currentTimeMillis(); this.history.setFinishTime(finishTime); // If this flag become true, taskLauncher will be terminated. - this.stopped = true; - - fetchLauncher.shutdown(); - fetchLauncher = null; LOG.info("Stop TaskRunner: " + getId()); synchronized (this) { + this.stopped = true; + + fetchLauncher.shutdown(); + fetchLauncher = null; + notifyAll(); } + super.stop(); this.history.setState(getServiceState()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index 57ae566..570bd38 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -230,7 +230,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< break; } try { - long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000; + long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000l; cleanExpiredFinishedQueryMasterTask(expireTime); } catch (Exception e) { LOG.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/64e47a40/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 870e9a0..462f95d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -111,7 +111,6 @@ public class WorkerHeartbeatService extends AbstractService { int workerCpuCoreNum; boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED); - int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors(); try { diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
