http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index d2afb83,66216ee..13a2021
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@@ -48,633 -49,9 +49,641 @@@ public interface Task 
  
    ExecutionBlockContext getExecutionBlockContext();
  
++<<<<<<< HEAD
 +//<<<<<<< HEAD
 +//
 +//  public TaskAttemptId getTaskId() {
 +//    return taskId;
 +//  }
 +//
 +//  public TaskAttemptId getId() {
 +//    return context.getTaskId();
 +//  }
 +//
 +//  public TaskAttemptState getStatus() {
 +//    return context.getState();
 +//  }
 +//
 +//  public String toString() {
 +//    return "queryId: " + this.getId() + " status: " + this.getStatus();
 +//  }
 +//
 +//  public void setState(TaskAttemptState status) {
 +//    context.setState(status);
 +//  }
 +//
 +//  public TaskAttemptContext getContext() {
 +//    return context;
 +//  }
 +//
 +//  public boolean hasFetchPhase() {
 +//    return fetcherRunners.size() > 0;
 +//  }
 +//
 +//  public List<Fetcher> getFetchers() {
 +//    return new ArrayList<Fetcher>(fetcherRunners);
 +//  }
 +//
 +//  public void fetch() {
 +//    ExecutorService executorService = 
executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
 +//    for (Fetcher f : fetcherRunners) {
 +//      executorService.submit(new FetchRunner(context, f));
 +//    }
 +//  }
 +//
 +//  public void kill() {
 +//    stopScriptExecutors();
 +//    context.setState(TaskAttemptState.TA_KILLED);
 +//    context.stop();
 +//  }
 +//
 +//  public void abort() {
 +//    stopScriptExecutors();
 +//    context.stop();
 +//  }
 +//
 +//  public void cleanUp() {
 +//    // remove itself from worker
 +//    if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
 +//      synchronized (executionBlockContext.getTasks()) {
 +//        executionBlockContext.getTasks().remove(this.getId());
 +//      }
 +//    } else {
 +//      LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + 
context.getState());
 +//    }
 +//  }
 +//
 +//  public TaskStatusProto getReport() {
 +//    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
 +//    
builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
 +//    builder.setId(context.getTaskId().getProto())
 +//        .setProgress(context.getProgress())
 +//        .setState(context.getState());
 +//
 +//    builder.setInputStats(reloadInputStats());
 +//
 +//    if (context.getResultStats() != null) {
 +//      builder.setResultStats(context.getResultStats().getProto());
 +//    }
 +//    return builder.build();
 +//  }
 +//
 +//  public boolean isRunning(){
 +//    return context.getState() == TaskAttemptState.TA_RUNNING;
 +//  }
 +//
 +//  public boolean isProgressChanged() {
 +//    return context.isProgressChanged();
 +//  }
 +//
 +//  public void updateProgress() {
 +//    if(context != null && context.isStopped()){
 +//      return;
 +//    }
 +//
 +//    if (executor != null && context.getProgress() < 1.0f) {
 +//      context.setExecutorProgress(executor.getProgress());
 +//    }
 +//  }
 +//
 +//  private CatalogProtos.TableStatsProto reloadInputStats() {
 +//    synchronized(inputStats) {
 +//      if (this.executor == null) {
 +//        return inputStats.getProto();
 +//      }
 +//
 +//      TableStats executorInputStats = this.executor.getInputStats();
 +//
 +//      if (executorInputStats != null) {
 +//        inputStats.setValues(executorInputStats);
 +//      }
 +//      return inputStats.getProto();
 +//    }
 +//  }
 +//
 +//  private TaskCompletionReport getTaskCompletionReport() {
 +//    TaskCompletionReport.Builder builder = 
TaskCompletionReport.newBuilder();
 +//    builder.setId(context.getTaskId().getProto());
 +//
 +//    builder.setInputStats(reloadInputStats());
 +//
 +//    if (context.hasResultStats()) {
 +//      builder.setResultStats(context.getResultStats().getProto());
 +//    } else {
 +//      builder.setResultStats(new TableStats().getProto());
 +//    }
 +//
 +//    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
 +//    if (it.hasNext()) {
 +//      do {
 +//        Entry<Integer, String> entry = it.next();
 +//        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
 +//        part.setPartId(entry.getKey());
 +//
 +//        // Set output volume
 +//        if (context.getPartitionOutputVolume() != null) {
 +//          for (Entry<Integer, Long> e : 
context.getPartitionOutputVolume().entrySet()) {
 +//            if (entry.getKey().equals(e.getKey())) {
 +//              part.setVolume(e.getValue().longValue());
 +//              break;
 +//            }
 +//          }
 +//        }
 +//
 +//        builder.addShuffleFileOutputs(part.build());
 +//      } while (it.hasNext());
 +//    }
 +//
 +//    return builder.build();
 +//  }
 +//
 +//  private void waitForFetch() throws InterruptedException, IOException {
 +//    context.getFetchLatch().await();
 +//    LOG.info(context.getTaskId() + " All fetches are done!");
 +//    Collection<String> inputs = 
Lists.newArrayList(context.getInputTables());
 +//
 +//    // Get all broadcasted tables
 +//    Set<String> broadcastTableNames = new HashSet<String>();
 +//    List<EnforceProperty> broadcasts = 
context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
 +//    if (broadcasts != null) {
 +//      for (EnforceProperty eachBroadcast : broadcasts) {
 +//        
broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
 +//      }
 +//    }
 +//
 +//    // localize the fetched data and skip the broadcast table
 +//    for (String inputTable: inputs) {
 +//      if (broadcastTableNames.contains(inputTable)) {
 +//        continue;
 +//      }
 +//      File tableDir = new File(context.getFetchIn(), inputTable);
 +//      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, 
descs.get(inputTable).getMeta());
 +//      context.updateAssignedFragments(inputTable, frags);
 +//    }
 +//  }
 +//
 +//  public void run() throws Exception {
 +//    startTime = System.currentTimeMillis();
 +//    Throwable error = null;
 +//    try {
 +//      if(!context.isStopped()) {
 +//        context.setState(TaskAttemptState.TA_RUNNING);
 +//        if (context.hasFetchPhase()) {
 +//          // If the fetch is still in progress, the query unit must wait for
 +//          // complete.
 +//          waitForFetch();
 +//          context.setFetcherProgress(FETCHER_PROGRESS);
 +//          context.setProgressChanged(true);
 +//          updateProgress();
 +//        }
 +//
 +//        this.executor = executionBlockContext.getTQueryEngine().
 +//            createPlan(context, plan);
 +//        this.executor.init();
 +//
 +//        while(!context.isStopped() && executor.next() != null) {
 +//        }
 +//      }
 +//    } catch (Throwable e) {
 +//      error = e ;
 +//      LOG.error(e.getMessage(), e);
 +//      stopScriptExecutors();
 +//      context.stop();
 +//    } finally {
 +//      if (executor != null) {
 +//        try {
 +//          executor.close();
 +//          reloadInputStats();
 +//        } catch (IOException e) {
 +//          LOG.error(e, e);
 +//        }
 +//        this.executor = null;
 +//      }
 +//
 +//      executionBlockContext.completedTasksNum.incrementAndGet();
 +//      context.getHashShuffleAppenderManager().finalizeTask(taskId);
 +//
 +//      QueryMasterProtocol.QueryMasterProtocolService.Interface 
queryMasterStub = executionBlockContext.getStub();
 +//      if (context.isStopped()) {
 +//        context.setExecutorProgress(0.0f);
 +//
 +//        if (context.getState() == TaskAttemptState.TA_KILLED) {
 +//          queryMasterStub.statusUpdate(null, getReport(), 
NullCallback.get());
 +//          executionBlockContext.killedTasksNum.incrementAndGet();
 +//        } else {
 +//          context.setState(TaskAttemptState.TA_FAILED);
 +//          TaskFatalErrorReport.Builder errorBuilder =
 +//              TaskFatalErrorReport.newBuilder()
 +//                  .setId(getId().getProto());
 +//          if (error != null) {
 +//            if (error.getMessage() == null) {
 +//              
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
 +//            } else {
 +//              errorBuilder.setErrorMessage(error.getMessage());
 +//            }
 +//            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
 +//          }
 +//
 +//          queryMasterStub.fatalError(null, errorBuilder.build(), 
NullCallback.get());
 +//          executionBlockContext.failedTasksNum.incrementAndGet();
 +//        }
 +//      } else {
 +//        // if successful
 +//        context.setProgress(1.0f);
 +//        context.setState(TaskAttemptState.TA_SUCCEEDED);
 +//        executionBlockContext.succeededTasksNum.incrementAndGet();
 +//
 +//        TaskCompletionReport report = getTaskCompletionReport();
 +//        queryMasterStub.done(null, report, NullCallback.get());
 +//      }
 +//      finishTime = System.currentTimeMillis();
 +//      LOG.info(context.getTaskId() + " completed. " +
 +//          "Worker's task counter - total:" + 
executionBlockContext.completedTasksNum.intValue() +
 +//          ", succeeded: " + 
executionBlockContext.succeededTasksNum.intValue()
 +//          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
 +//          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
 +//      cleanupTask();
 +//    }
 +//  }
 +//
 +//  public void cleanupTask() {
 +//    TaskHistory taskHistory = createTaskHistory();
 +//    executionBlockContext.addTaskHistory(taskRunnerId, getId(), 
taskHistory);
 +//    executionBlockContext.getTasks().remove(getId());
 +//
 +//    fetcherRunners.clear();
 +//    fetcherRunners = null;
 +//    try {
 +//      if(executor != null) {
 +//        executor.close();
 +//        executor = null;
 +//      }
 +//    } catch (IOException e) {
 +//      LOG.fatal(e.getMessage(), e);
 +//    }
 +//
 +//    
executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
 +//    stopScriptExecutors();
 +//  }
 +//
 +//  public TaskHistory createTaskHistory() {
 +//    TaskHistory taskHistory = null;
 +//    try {
 +//      taskHistory = new TaskHistory(getTaskId(), getStatus(), 
context.getProgress(),
 +//          startTime, finishTime, reloadInputStats());
 +//
 +//      if (context.getOutputPath() != null) {
 +//        taskHistory.setOutputPath(context.getOutputPath().toString());
 +//      }
 +//
 +//      if (context.getWorkDir() != null) {
 +//        taskHistory.setWorkingPath(context.getWorkDir().toString());
 +//      }
 +//
 +//      if (context.getResultStats() != null) {
 +//        taskHistory.setOutputStats(context.getResultStats().getProto());
 +//      }
 +//
 +//      if (hasFetchPhase()) {
 +//        taskHistory.setTotalFetchCount(fetcherRunners.size());
 +//        int i = 0;
 +//        FetcherHistoryProto.Builder builder = 
FetcherHistoryProto.newBuilder();
 +//        for (Fetcher fetcher : fetcherRunners) {
 +//          // TODO store the fetcher histories
 +//          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
 +//            builder.setStartTime(fetcher.getStartTime());
 +//            builder.setFinishTime(fetcher.getFinishTime());
 +//            builder.setFileLength(fetcher.getFileLen());
 +//            
builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
 +//            builder.setState(fetcher.getState());
 +//
 +//            taskHistory.addFetcherHistory(builder.build());
 +//          }
 +//          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) 
i++;
 +//        }
 +//        taskHistory.setFinishedFetchCount(i);
 +//      }
 +//    } catch (Exception e) {
 +//      LOG.warn(e.getMessage(), e);
 +//    }
 +//
 +//    return taskHistory;
 +//  }
 +//
 +//  public int hashCode() {
 +//    return context.hashCode();
 +//  }
 +//
 +//  public boolean equals(Object obj) {
 +//    if (obj instanceof Task) {
 +//      Task other = (Task) obj;
 +//      return this.context.equals(other.context);
 +//    }
 +//    return false;
 +//  }
 +//
 +//  private FileFragment[] localizeFetchedData(File file, String name, 
TableMeta meta)
 +//      throws IOException {
 +//    Configuration c = new Configuration(systemConf);
 +//    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
 +//    FileSystem fs = FileSystem.get(c);
 +//    Path tablePath = new Path(file.getAbsolutePath());
 +//
 +//    List<FileFragment> listTablets = new ArrayList<FileFragment>();
 +//    FileFragment tablet;
 +//
 +//    FileStatus[] fileLists = fs.listStatus(tablePath);
 +//    for (FileStatus f : fileLists) {
 +//      if (f.getLen() == 0) {
 +//        continue;
 +//      }
 +//      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
 +//      listTablets.add(tablet);
 +//    }
 +//
 +//    // Special treatment for locally pseudo fetched chunks
 +//    synchronized (localChunks) {
 +//      for (FileChunk chunk : localChunks) {
 +//        if (name.equals(chunk.getEbId())) {
 +//          tablet = new FileFragment(name, new 
Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
 +//          listTablets.add(tablet);
 +//          LOG.info("One local chunk is added to listTablets");
 +//        }
 +//      }
 +//    }
 +//
 +//    FileFragment[] tablets = new FileFragment[listTablets.size()];
 +//    listTablets.toArray(tablets);
 +//
 +//    return tablets;
 +//  }
 +//
 +//  private class FetchRunner implements Runnable {
 +//    private final TaskAttemptContext ctx;
 +//    private final Fetcher fetcher;
 +//    private int maxRetryNum;
 +//
 +//    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
 +//      this.ctx = ctx;
 +//      this.fetcher = fetcher;
 +//      this.maxRetryNum = 
systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
 +//    }
 +//
 +//    @Override
 +//    public void run() {
 +//      int retryNum = 0;
 +//      int retryWaitTime = 1000; //sec
 +//
 +//      try { // for releasing fetch latch
 +//        while(!context.isStopped() && retryNum < maxRetryNum) {
 +//          if (retryNum > 0) {
 +//            try {
 +//              Thread.sleep(retryWaitTime);
 +//              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // 
max 10 seconds
 +//            } catch (InterruptedException e) {
 +//              LOG.error(e);
 +//            }
 +//            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + 
retryNum + ")");
 +//          }
 +//          try {
 +//            FileChunk fetched = fetcher.get();
 +//            if (fetcher.getState() == 
TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
 +//          && fetched.getFile() != null) {
 +//              if (fetched.fromRemote() == false) {
 +//          localChunks.add(fetched);
 +//          LOG.info("Add a new FileChunk to local chunk list");
 +//              }
 +//              break;
 +//            }
 +//          } catch (Throwable e) {
 +//            LOG.error("Fetch failed: " + fetcher.getURI(), e);
 +//          }
 +//          retryNum++;
 +//        }
 +//      } finally {
 +//        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
 +//          fetcherFinished(ctx);
 +//        } else {
 +//          if (retryNum == maxRetryNum) {
 +//            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the 
fetch exceeded (" + fetcher.getURI() + ")");
 +//          }
 +//          stopScriptExecutors();
 +//          context.stop(); // retry task
 +//          ctx.getFetchLatch().countDown();
 +//        }
 +//      }
 +//    }
 +//  }
 +//
 +//  @VisibleForTesting
 +//  public static float adjustFetchProcess(int totalFetcher, int 
remainFetcher) {
 +//    if (totalFetcher > 0) {
 +//      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * 
FETCHER_PROGRESS;
 +//    } else {
 +//      return 0.0f;
 +//    }
 +//  }
 +//
 +//  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
 +//    int fetcherSize = fetcherRunners.size();
 +//    if(fetcherSize == 0) {
 +//      return;
 +//    }
 +//
 +//    ctx.getFetchLatch().countDown();
 +//
 +//    int remainFetcher = (int) ctx.getFetchLatch().getCount();
 +//    if (remainFetcher == 0) {
 +//      context.setFetcherProgress(FETCHER_PROGRESS);
 +//    } else {
 +//      context.setFetcherProgress(adjustFetchProcess(fetcherSize, 
remainFetcher));
 +//      context.setProgressChanged(true);
 +//    }
 +//  }
 +//
 +//  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
 +//                                        List<FetchImpl> fetches) throws 
IOException {
 +//
 +//    if (fetches.size() > 0) {
 +//      Path inputDir = executionBlockContext.getLocalDirAllocator().
 +//          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), 
systemConf);
 +//
 +//      int i = 0;
 +//      File storeDir;
 +//      File defaultStoreFile;
 +//      FileChunk storeChunk = null;
 +//      List<Fetcher> runnerList = Lists.newArrayList();
 +//
 +//      for (FetchImpl f : fetches) {
 +//        storeDir = new File(inputDir.toString(), f.getName());
 +//        if (!storeDir.exists()) {
 +//          storeDir.mkdirs();
 +//        }
 +//
 +//        for (URI uri : f.getURIs()) {
 +//          defaultStoreFile = new File(storeDir, "in_" + i);
 +//          InetAddress address = InetAddress.getByName(uri.getHost());
 +//
 +//          WorkerConnectionInfo conn = 
executionBlockContext.getWorkerContext().getConnectionInfo();
 +//          if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() 
== uri.getPort()) {
 +//            boolean hasError = false;
 +//            try {
 +//              LOG.info("Try to get local file chunk at local host");
 +//              storeChunk = getLocalStoredFileChunk(uri, systemConf);
 +//            } catch (Throwable t) {
 +//              hasError = true;
 +//            }
 +//
 +//            // When a range request is out of range, storeChunk will be 
NULL. This case is normal state.
 +//            // So, we should skip and don't need to create storeChunk.
 +//            if (storeChunk == null && !hasError) {
 +//              continue;
 +//            }
 +//
 +//            if (storeChunk != null && storeChunk.getFile() != null && 
storeChunk.startOffset() > -1
 +//                && hasError == false) {
 +//              storeChunk.setFromRemote(false);
 +//            } else {
 +//              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
 +//              storeChunk.setFromRemote(true);
 +//            }
 +//          } else {
 +//            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
 +//            storeChunk.setFromRemote(true);
 +//          }
 +//
 +//          // If we decide that intermediate data should be really fetched 
from a remote host, storeChunk
 +//          // represents a complete file. Otherwise, storeChunk may 
represent a complete file or only a part of it
 +//          storeChunk.setEbId(f.getName());
 +//          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
 +//          LOG.info("Create a new Fetcher with storeChunk:" + 
storeChunk.toString());
 +//          runnerList.add(fetcher);
 +//          i++;
 +//        }
 +//      }
 +//      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
 +//      return runnerList;
 +//    } else {
 +//      return Lists.newArrayList();
 +//    }
 +//  }
 +//
 +//  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) 
throws IOException {
 +//    // Parse the URI
 +//    LOG.info("getLocalStoredFileChunk starts");
 +//    final Map<String, List<String>> params = new 
QueryStringDecoder(fetchURI.toString()).parameters();
 +//    final List<String> types = params.get("type");
 +//    final List<String> qids = params.get("qid");
 +//    final List<String> taskIdList = params.get("ta");
 +//    final List<String> stageIds = params.get("sid");
 +//    final List<String> partIds = params.get("p");
 +//    final List<String> offsetList = params.get("offset");
 +//    final List<String> lengthList = params.get("length");
 +//
 +//    if (types == null || stageIds == null || qids == null || partIds == 
null) {
 +//      LOG.error("Invalid URI - Required queryId, type, stage Id, and part 
id");
 +//      return null;
 +//    }
 +//
 +//    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
 +//      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and 
part id");
 +//      return null;
 +//    }
 +//
 +//    String queryId = qids.get(0);
 +//    String shuffleType = types.get(0);
 +//    String sid = stageIds.get(0);
 +//    String partId = partIds.get(0);
 +//
 +//    if (shuffleType.equals("r") && taskIdList == null) {
 +//      LOG.error("Invalid URI - For range shuffle, taskId is required");
 +//      return null;
 +//    }
 +//    List<String> taskIds = splitMaps(taskIdList);
 +//
 +//    FileChunk chunk = null;
 +//    long offset = (offsetList != null && !offsetList.isEmpty()) ? 
Long.parseLong(offsetList.get(0)) : -1L;
 +//    long length = (lengthList != null && !lengthList.isEmpty()) ? 
Long.parseLong(lengthList.get(0)) : -1L;
 +//
 +//    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", 
sid=" + sid + ", partId=" + partId
 +//    + ", taskIds=" + taskIdList);
 +//
 +//    // The working directory of Tajo worker for each query, including stage
 +//    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
 +//
 +//    // If the stage requires a range shuffle
 +//    if (shuffleType.equals("r")) {
 +//      String ta = taskIds.get(0);
 +//      if 
(!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + 
"/output/", conf)) {
 +//        LOG.warn("Range shuffle - file not exist");
 +//        return null;
 +//      }
 +//      Path path = executionBlockContext.getLocalFS().makeQualified(
 +//          
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + 
ta + "/output/", conf));
 +//      String startKey = params.get("start").get(0);
 +//      String endKey = params.get("end").get(0);
 +//      boolean last = params.get("final") != null;
 +//
 +//      try {
 +//        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, 
last);
 +//            } catch (Throwable t) {
 +//        LOG.error("getFileChunks() throws exception");
 +//        return null;
 +//      }
 +//
 +//      // If the stage requires a hash shuffle or a scattered hash shuffle
 +//    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
 +//      int partParentId = 
HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) 
conf);
 +//      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" 
+ partId;
 +//      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, 
conf)) {
 +//        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: 
" + partPath);
 +//        return null;
 +//      }
 +//      Path path = executionBlockContext.getLocalFS().makeQualified(
 +//        
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, 
conf));
 +//      File file = new File(path.toUri());
 +//      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
 +//      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
 +//
 +//      if (startPos >= file.length()) {
 +//        LOG.error("Start pos[" + startPos + "] great than file length [" + 
file.length() + "]");
 +//        return null;
 +//      }
 +//      chunk = new FileChunk(file, startPos, readLen);
 +//
 +//    } else {
 +//      LOG.error("Unknown shuffle type");
 +//      return null;
 +//    }
 +//
 +//    return chunk;
 +//  }
 +//
 +//  private List<String> splitMaps(List<String> mapq) {
 +//    if (null == mapq) {
 +//      return null;
 +//    }
 +//    final List<String> ret = new ArrayList<String>();
 +//    for (String s : mapq) {
 +//      Collections.addAll(ret, s.split(","));
 +//    }
 +//    return ret;
 +//  }
 +//
 +//  public static Path getTaskAttemptDir(TaskAttemptId quid) {
 +//    Path workDir =
 +//        
StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
 +//            String.valueOf(quid.getTaskId().getId()),
 +//            String.valueOf(quid.getId()));
 +//    return workDir;
 +//  }
 +//=======
 +  TajoWorkerProtocol.TaskStatusProto getReport();
++=======
+   TaskStatusProto getReport();
+ 
+   TaskHistory createTaskHistory();
+ 
+   List<Fetcher> getFetchers();
++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
----------------------------------------------------------------------
diff --cc 
tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
index 2063c61,f91288d..dc3ae57
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
@@@ -262,11 -263,10 +263,18 @@@ public class QueryResource 
          return ResourcesUtil.createBadRequestResponse(LOG, "Provided session 
id (" + sessionId + ") is invalid.");
        }
        
++<<<<<<< HEAD
 +      SubmitQueryResponse response =
 +          masterContext.getGlobalEngine().executeQuery(session, 
request.getQuery(), false);
 +      if (response.getResult().hasResultCode() &&
 +          
ClientProtos.ResultCode.ERROR.equals(response.getResult().getResultCode())) {
 +        return ResourcesUtil.createExceptionResponse(LOG, 
response.getResult().getErrorMessage());
++=======
+       SubmitQueryResponse response = 
+         masterContext.getGlobalEngine().executeQuery(session, 
request.getQuery(), false);
+       if (ReturnStateUtil.isError(response.getState())) {
+         return ResourcesUtil.createExceptionResponse(LOG, 
response.getState().getMessage());
++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
        } else {
          JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
            
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
@@@ -283,7 -283,7 +291,11 @@@
            queryResponse.setUri(queryURI);
          }
  
++<<<<<<< HEAD
 +        queryResponse.setResultCode(response.getResult().getResultCode());
++=======
+         queryResponse.setResultCode(response.getState().getReturnCode());
++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
          queryResponse.setQuery(request.getQuery());
          return Response.status(Status.OK).entity(queryResponse).build();
        }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --cc 
tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index cf8a598,77bfca6..baa8510
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@@ -864,8 -865,8 +865,13 @@@ public class TestTablePartitions extend
      ClientProtos.SubmitQueryResponse response = client.executeQuery("insert 
overwrite into " + tableName
          + " select l_orderkey, l_partkey from lineitem");
  
++<<<<<<< HEAD
 +    assertTrue(response.getResult().hasErrorMessage());
 +    assertEquals(response.getResult().getErrorMessage(), "INSERT has smaller 
expressions than target columns\n");
++=======
+     assertTrue(ReturnStateUtil.isError(response.getState()));
+     assertEquals(response.getState().getMessage(), "INSERT has smaller 
expressions than target columns");
++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
  
      res = executeFile("case14.sql");
      assertResultSet(res, "case14.result");
@@@ -890,8 -891,8 +896,13 @@@
        response = client.executeQuery("insert overwrite into " + tableName
          + " select l_returnflag , l_orderkey, l_partkey from lineitem");
  
++<<<<<<< HEAD
 +      assertTrue(response.getResult().hasErrorMessage());
 +      assertEquals(response.getResult().getErrorMessage(), "INSERT has 
smaller expressions than target columns\n");
++=======
+       assertTrue(ReturnStateUtil.isError(response.getState()));
+       assertEquals(response.getState().getMessage(), "INSERT has smaller 
expressions than target columns");
++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
  
        res = executeFile("case15.sql");
        assertResultSet(res, "case15.result");

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --cc tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index e1242a9,60f7ca3..c7ce211
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@@ -163,15 -158,7 +158,19 @@@ public class TajoStatement implements S
      }
  
      ClientProtos.SubmitQueryResponse response = tajoClient.executeQuery(sql);
++<<<<<<< HEAD
 +    if (response.getResult().getResultCode() == 
ClientProtos.ResultCode.ERROR) {
 +      if (response.getResult().hasErrorMessage()) {
 +        throw new ServiceException(response.getResult().getErrorMessage());
 +      }
 +      if (response.getResult().hasErrorTrace()) {
 +        throw new ServiceException(response.getResult().getErrorTrace());
 +      }
 +      throw new ServiceException("Failed to submit query by unknown reason");
 +    }
++=======
+     SQLExceptionUtil.throwIfError(response.getState());
++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
  
      QueryId queryId = new QueryId(response.getQueryId());
      if (response.getIsForwarded() && !queryId.isNull()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
----------------------------------------------------------------------
diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
index ed16f82,2760722..61cadca
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
@@@ -28,29 -26,22 +26,31 @@@ import org.apache.tajo.ConfigKey
  import org.apache.tajo.OverridableConf;
  import org.apache.tajo.SessionVars;
  import org.apache.tajo.algebra.JoinType;
 +import org.apache.tajo.catalog.CatalogService;
  import org.apache.tajo.conf.TajoConf;
  import org.apache.tajo.conf.TajoConf.ConfVars;
- import org.apache.tajo.util.ReflectionUtil;
- import org.apache.tajo.util.graph.DirectedGraphCursor;
  import org.apache.tajo.plan.expr.AlgebraicUtil;
  import org.apache.tajo.plan.expr.EvalNode;
- import org.apache.tajo.plan.joinorder.FoundJoinOrder;
- import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm;
- import org.apache.tajo.plan.joinorder.JoinGraph;
- import org.apache.tajo.plan.joinorder.JoinOrderAlgorithm;
+ import org.apache.tajo.plan.expr.EvalTreeUtil;
+ import org.apache.tajo.plan.joinorder.*;
  import org.apache.tajo.plan.logical.*;
++<<<<<<< HEAD
 +import org.apache.tajo.plan.rewrite.rules.AccessPathRewriter;
 +import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule;
 +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 +import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
 +import org.apache.tajo.plan.rewrite.*;
++=======
+ import org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteEngine;
+ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleProvider;
++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
  import org.apache.tajo.plan.util.PlannerUtil;
  import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+ import org.apache.tajo.util.ReflectionUtil;
+ import org.apache.tajo.util.TUtil;
+ import org.apache.tajo.util.graph.DirectedGraphCursor;
  
- import java.util.LinkedHashSet;
- import java.util.Set;
- import java.util.Stack;
+ import java.util.*;
  
  import static org.apache.tajo.plan.LogicalPlan.BlockEdge;
  import static 
org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm.getCost;
@@@ -105,9 -87,9 +99,9 @@@ public class LogicalOptimizer 
          optimizeJoinOrder(plan, blockCursor.nextBlock());
        }
      } else {
-       LOG.info("Skip Join Optimized.");
+       LOG.info("Skip join order optimization");
      }
 -    rulesAfterToJoinOpt.rewrite(context, plan);
 +    rulesAfterToJoinOpt.rewrite(new LogicalPlanRewriteRuleContext(context, 
plan, catalog));
      return plan.getRootBlock().getRoot();
    }
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 5a7ea2b,ef8663d..d10950a
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@@ -35,12 -35,12 +35,14 @@@ import org.apache.tajo.SessionVars
  import org.apache.tajo.algebra.*;
  import org.apache.tajo.algebra.WindowSpec;
  import org.apache.tajo.catalog.*;
+ import org.apache.tajo.catalog.exception.UndefinedColumnException;
  import org.apache.tajo.catalog.partition.PartitionMethodDesc;
  import org.apache.tajo.catalog.proto.CatalogProtos;
 +import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
  import org.apache.tajo.common.TajoDataTypes;
 +import org.apache.tajo.conf.TajoConf;
  import org.apache.tajo.datum.NullDatum;
+ import org.apache.tajo.exception.ExceptionUtil;
  import org.apache.tajo.plan.LogicalPlan.QueryBlock;
  import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
  import org.apache.tajo.plan.expr.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------

Reply via email to