http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 13a2021..66216ee 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -49,641 +49,9 @@ public interface Task {
 
   ExecutionBlockContext getExecutionBlockContext();
 
-<<<<<<< HEAD
-//<<<<<<< HEAD
-//
-//  public TaskAttemptId getTaskId() {
-//    return taskId;
-//  }
-//
-//  public TaskAttemptId getId() {
-//    return context.getTaskId();
-//  }
-//
-//  public TaskAttemptState getStatus() {
-//    return context.getState();
-//  }
-//
-//  public String toString() {
-//    return "queryId: " + this.getId() + " status: " + this.getStatus();
-//  }
-//
-//  public void setState(TaskAttemptState status) {
-//    context.setState(status);
-//  }
-//
-//  public TaskAttemptContext getContext() {
-//    return context;
-//  }
-//
-//  public boolean hasFetchPhase() {
-//    return fetcherRunners.size() > 0;
-//  }
-//
-//  public List<Fetcher> getFetchers() {
-//    return new ArrayList<Fetcher>(fetcherRunners);
-//  }
-//
-//  public void fetch() {
-//    ExecutorService executorService = 
executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
-//    for (Fetcher f : fetcherRunners) {
-//      executorService.submit(new FetchRunner(context, f));
-//    }
-//  }
-//
-//  public void kill() {
-//    stopScriptExecutors();
-//    context.setState(TaskAttemptState.TA_KILLED);
-//    context.stop();
-//  }
-//
-//  public void abort() {
-//    stopScriptExecutors();
-//    context.stop();
-//  }
-//
-//  public void cleanUp() {
-//    // remove itself from worker
-//    if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
-//      synchronized (executionBlockContext.getTasks()) {
-//        executionBlockContext.getTasks().remove(this.getId());
-//      }
-//    } else {
-//      LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + 
context.getState());
-//    }
-//  }
-//
-//  public TaskStatusProto getReport() {
-//    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
-//    
builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
-//    builder.setId(context.getTaskId().getProto())
-//        .setProgress(context.getProgress())
-//        .setState(context.getState());
-//
-//    builder.setInputStats(reloadInputStats());
-//
-//    if (context.getResultStats() != null) {
-//      builder.setResultStats(context.getResultStats().getProto());
-//    }
-//    return builder.build();
-//  }
-//
-//  public boolean isRunning(){
-//    return context.getState() == TaskAttemptState.TA_RUNNING;
-//  }
-//
-//  public boolean isProgressChanged() {
-//    return context.isProgressChanged();
-//  }
-//
-//  public void updateProgress() {
-//    if(context != null && context.isStopped()){
-//      return;
-//    }
-//
-//    if (executor != null && context.getProgress() < 1.0f) {
-//      context.setExecutorProgress(executor.getProgress());
-//    }
-//  }
-//
-//  private CatalogProtos.TableStatsProto reloadInputStats() {
-//    synchronized(inputStats) {
-//      if (this.executor == null) {
-//        return inputStats.getProto();
-//      }
-//
-//      TableStats executorInputStats = this.executor.getInputStats();
-//
-//      if (executorInputStats != null) {
-//        inputStats.setValues(executorInputStats);
-//      }
-//      return inputStats.getProto();
-//    }
-//  }
-//
-//  private TaskCompletionReport getTaskCompletionReport() {
-//    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
-//    builder.setId(context.getTaskId().getProto());
-//
-//    builder.setInputStats(reloadInputStats());
-//
-//    if (context.hasResultStats()) {
-//      builder.setResultStats(context.getResultStats().getProto());
-//    } else {
-//      builder.setResultStats(new TableStats().getProto());
-//    }
-//
-//    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
-//    if (it.hasNext()) {
-//      do {
-//        Entry<Integer, String> entry = it.next();
-//        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
-//        part.setPartId(entry.getKey());
-//
-//        // Set output volume
-//        if (context.getPartitionOutputVolume() != null) {
-//          for (Entry<Integer, Long> e : 
context.getPartitionOutputVolume().entrySet()) {
-//            if (entry.getKey().equals(e.getKey())) {
-//              part.setVolume(e.getValue().longValue());
-//              break;
-//            }
-//          }
-//        }
-//
-//        builder.addShuffleFileOutputs(part.build());
-//      } while (it.hasNext());
-//    }
-//
-//    return builder.build();
-//  }
-//
-//  private void waitForFetch() throws InterruptedException, IOException {
-//    context.getFetchLatch().await();
-//    LOG.info(context.getTaskId() + " All fetches are done!");
-//    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
-//
-//    // Get all broadcasted tables
-//    Set<String> broadcastTableNames = new HashSet<String>();
-//    List<EnforceProperty> broadcasts = 
context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
-//    if (broadcasts != null) {
-//      for (EnforceProperty eachBroadcast : broadcasts) {
-//        broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
-//      }
-//    }
-//
-//    // localize the fetched data and skip the broadcast table
-//    for (String inputTable: inputs) {
-//      if (broadcastTableNames.contains(inputTable)) {
-//        continue;
-//      }
-//      File tableDir = new File(context.getFetchIn(), inputTable);
-//      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, 
descs.get(inputTable).getMeta());
-//      context.updateAssignedFragments(inputTable, frags);
-//    }
-//  }
-//
-//  public void run() throws Exception {
-//    startTime = System.currentTimeMillis();
-//    Throwable error = null;
-//    try {
-//      if(!context.isStopped()) {
-//        context.setState(TaskAttemptState.TA_RUNNING);
-//        if (context.hasFetchPhase()) {
-//          // If the fetch is still in progress, the query unit must wait for
-//          // complete.
-//          waitForFetch();
-//          context.setFetcherProgress(FETCHER_PROGRESS);
-//          context.setProgressChanged(true);
-//          updateProgress();
-//        }
-//
-//        this.executor = executionBlockContext.getTQueryEngine().
-//            createPlan(context, plan);
-//        this.executor.init();
-//
-//        while(!context.isStopped() && executor.next() != null) {
-//        }
-//      }
-//    } catch (Throwable e) {
-//      error = e ;
-//      LOG.error(e.getMessage(), e);
-//      stopScriptExecutors();
-//      context.stop();
-//    } finally {
-//      if (executor != null) {
-//        try {
-//          executor.close();
-//          reloadInputStats();
-//        } catch (IOException e) {
-//          LOG.error(e, e);
-//        }
-//        this.executor = null;
-//      }
-//
-//      executionBlockContext.completedTasksNum.incrementAndGet();
-//      context.getHashShuffleAppenderManager().finalizeTask(taskId);
-//
-//      QueryMasterProtocol.QueryMasterProtocolService.Interface 
queryMasterStub = executionBlockContext.getStub();
-//      if (context.isStopped()) {
-//        context.setExecutorProgress(0.0f);
-//
-//        if (context.getState() == TaskAttemptState.TA_KILLED) {
-//          queryMasterStub.statusUpdate(null, getReport(), 
NullCallback.get());
-//          executionBlockContext.killedTasksNum.incrementAndGet();
-//        } else {
-//          context.setState(TaskAttemptState.TA_FAILED);
-//          TaskFatalErrorReport.Builder errorBuilder =
-//              TaskFatalErrorReport.newBuilder()
-//                  .setId(getId().getProto());
-//          if (error != null) {
-//            if (error.getMessage() == null) {
-//              
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
-//            } else {
-//              errorBuilder.setErrorMessage(error.getMessage());
-//            }
-//            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
-//          }
-//
-//          queryMasterStub.fatalError(null, errorBuilder.build(), 
NullCallback.get());
-//          executionBlockContext.failedTasksNum.incrementAndGet();
-//        }
-//      } else {
-//        // if successful
-//        context.setProgress(1.0f);
-//        context.setState(TaskAttemptState.TA_SUCCEEDED);
-//        executionBlockContext.succeededTasksNum.incrementAndGet();
-//
-//        TaskCompletionReport report = getTaskCompletionReport();
-//        queryMasterStub.done(null, report, NullCallback.get());
-//      }
-//      finishTime = System.currentTimeMillis();
-//      LOG.info(context.getTaskId() + " completed. " +
-//          "Worker's task counter - total:" + 
executionBlockContext.completedTasksNum.intValue() +
-//          ", succeeded: " + 
executionBlockContext.succeededTasksNum.intValue()
-//          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
-//          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
-//      cleanupTask();
-//    }
-//  }
-//
-//  public void cleanupTask() {
-//    TaskHistory taskHistory = createTaskHistory();
-//    executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
-//    executionBlockContext.getTasks().remove(getId());
-//
-//    fetcherRunners.clear();
-//    fetcherRunners = null;
-//    try {
-//      if(executor != null) {
-//        executor.close();
-//        executor = null;
-//      }
-//    } catch (IOException e) {
-//      LOG.fatal(e.getMessage(), e);
-//    }
-//
-//    
executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
-//    stopScriptExecutors();
-//  }
-//
-//  public TaskHistory createTaskHistory() {
-//    TaskHistory taskHistory = null;
-//    try {
-//      taskHistory = new TaskHistory(getTaskId(), getStatus(), 
context.getProgress(),
-//          startTime, finishTime, reloadInputStats());
-//
-//      if (context.getOutputPath() != null) {
-//        taskHistory.setOutputPath(context.getOutputPath().toString());
-//      }
-//
-//      if (context.getWorkDir() != null) {
-//        taskHistory.setWorkingPath(context.getWorkDir().toString());
-//      }
-//
-//      if (context.getResultStats() != null) {
-//        taskHistory.setOutputStats(context.getResultStats().getProto());
-//      }
-//
-//      if (hasFetchPhase()) {
-//        taskHistory.setTotalFetchCount(fetcherRunners.size());
-//        int i = 0;
-//        FetcherHistoryProto.Builder builder = 
FetcherHistoryProto.newBuilder();
-//        for (Fetcher fetcher : fetcherRunners) {
-//          // TODO store the fetcher histories
-//          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
-//            builder.setStartTime(fetcher.getStartTime());
-//            builder.setFinishTime(fetcher.getFinishTime());
-//            builder.setFileLength(fetcher.getFileLen());
-//            
builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
-//            builder.setState(fetcher.getState());
-//
-//            taskHistory.addFetcherHistory(builder.build());
-//          }
-//          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) 
i++;
-//        }
-//        taskHistory.setFinishedFetchCount(i);
-//      }
-//    } catch (Exception e) {
-//      LOG.warn(e.getMessage(), e);
-//    }
-//
-//    return taskHistory;
-//  }
-//
-//  public int hashCode() {
-//    return context.hashCode();
-//  }
-//
-//  public boolean equals(Object obj) {
-//    if (obj instanceof Task) {
-//      Task other = (Task) obj;
-//      return this.context.equals(other.context);
-//    }
-//    return false;
-//  }
-//
-//  private FileFragment[] localizeFetchedData(File file, String name, 
TableMeta meta)
-//      throws IOException {
-//    Configuration c = new Configuration(systemConf);
-//    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
-//    FileSystem fs = FileSystem.get(c);
-//    Path tablePath = new Path(file.getAbsolutePath());
-//
-//    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-//    FileFragment tablet;
-//
-//    FileStatus[] fileLists = fs.listStatus(tablePath);
-//    for (FileStatus f : fileLists) {
-//      if (f.getLen() == 0) {
-//        continue;
-//      }
-//      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
-//      listTablets.add(tablet);
-//    }
-//
-//    // Special treatment for locally pseudo fetched chunks
-//    synchronized (localChunks) {
-//      for (FileChunk chunk : localChunks) {
-//        if (name.equals(chunk.getEbId())) {
-//          tablet = new FileFragment(name, new 
Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
-//          listTablets.add(tablet);
-//          LOG.info("One local chunk is added to listTablets");
-//        }
-//      }
-//    }
-//
-//    FileFragment[] tablets = new FileFragment[listTablets.size()];
-//    listTablets.toArray(tablets);
-//
-//    return tablets;
-//  }
-//
-//  private class FetchRunner implements Runnable {
-//    private final TaskAttemptContext ctx;
-//    private final Fetcher fetcher;
-//    private int maxRetryNum;
-//
-//    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
-//      this.ctx = ctx;
-//      this.fetcher = fetcher;
-//      this.maxRetryNum = 
systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
-//    }
-//
-//    @Override
-//    public void run() {
-//      int retryNum = 0;
-//      int retryWaitTime = 1000; //sec
-//
-//      try { // for releasing fetch latch
-//        while(!context.isStopped() && retryNum < maxRetryNum) {
-//          if (retryNum > 0) {
-//            try {
-//              Thread.sleep(retryWaitTime);
-//              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // 
max 10 seconds
-//            } catch (InterruptedException e) {
-//              LOG.error(e);
-//            }
-//            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + 
retryNum + ")");
-//          }
-//          try {
-//            FileChunk fetched = fetcher.get();
-//            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED 
&& fetched != null
-//          && fetched.getFile() != null) {
-//              if (fetched.fromRemote() == false) {
-//          localChunks.add(fetched);
-//          LOG.info("Add a new FileChunk to local chunk list");
-//              }
-//              break;
-//            }
-//          } catch (Throwable e) {
-//            LOG.error("Fetch failed: " + fetcher.getURI(), e);
-//          }
-//          retryNum++;
-//        }
-//      } finally {
-//        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
-//          fetcherFinished(ctx);
-//        } else {
-//          if (retryNum == maxRetryNum) {
-//            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the 
fetch exceeded (" + fetcher.getURI() + ")");
-//          }
-//          stopScriptExecutors();
-//          context.stop(); // retry task
-//          ctx.getFetchLatch().countDown();
-//        }
-//      }
-//    }
-//  }
-//
-//  @VisibleForTesting
-//  public static float adjustFetchProcess(int totalFetcher, int 
remainFetcher) {
-//    if (totalFetcher > 0) {
-//      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * 
FETCHER_PROGRESS;
-//    } else {
-//      return 0.0f;
-//    }
-//  }
-//
-//  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
-//    int fetcherSize = fetcherRunners.size();
-//    if(fetcherSize == 0) {
-//      return;
-//    }
-//
-//    ctx.getFetchLatch().countDown();
-//
-//    int remainFetcher = (int) ctx.getFetchLatch().getCount();
-//    if (remainFetcher == 0) {
-//      context.setFetcherProgress(FETCHER_PROGRESS);
-//    } else {
-//      context.setFetcherProgress(adjustFetchProcess(fetcherSize, 
remainFetcher));
-//      context.setProgressChanged(true);
-//    }
-//  }
-//
-//  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-//                                        List<FetchImpl> fetches) throws 
IOException {
-//
-//    if (fetches.size() > 0) {
-//      Path inputDir = executionBlockContext.getLocalDirAllocator().
-//          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), 
systemConf);
-//
-//      int i = 0;
-//      File storeDir;
-//      File defaultStoreFile;
-//      FileChunk storeChunk = null;
-//      List<Fetcher> runnerList = Lists.newArrayList();
-//
-//      for (FetchImpl f : fetches) {
-//        storeDir = new File(inputDir.toString(), f.getName());
-//        if (!storeDir.exists()) {
-//          storeDir.mkdirs();
-//        }
-//
-//        for (URI uri : f.getURIs()) {
-//          defaultStoreFile = new File(storeDir, "in_" + i);
-//          InetAddress address = InetAddress.getByName(uri.getHost());
-//
-//          WorkerConnectionInfo conn = 
executionBlockContext.getWorkerContext().getConnectionInfo();
-//          if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() 
== uri.getPort()) {
-//            boolean hasError = false;
-//            try {
-//              LOG.info("Try to get local file chunk at local host");
-//              storeChunk = getLocalStoredFileChunk(uri, systemConf);
-//            } catch (Throwable t) {
-//              hasError = true;
-//            }
-//
-//            // When a range request is out of range, storeChunk will be 
NULL. This case is normal state.
-//            // So, we should skip and don't need to create storeChunk.
-//            if (storeChunk == null && !hasError) {
-//              continue;
-//            }
-//
-//            if (storeChunk != null && storeChunk.getFile() != null && 
storeChunk.startOffset() > -1
-//                && hasError == false) {
-//              storeChunk.setFromRemote(false);
-//            } else {
-//              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-//              storeChunk.setFromRemote(true);
-//            }
-//          } else {
-//            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-//            storeChunk.setFromRemote(true);
-//          }
-//
-//          // If we decide that intermediate data should be really fetched 
from a remote host, storeChunk
-//          // represents a complete file. Otherwise, storeChunk may represent 
a complete file or only a part of it
-//          storeChunk.setEbId(f.getName());
-//          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
-//          LOG.info("Create a new Fetcher with storeChunk:" + 
storeChunk.toString());
-//          runnerList.add(fetcher);
-//          i++;
-//        }
-//      }
-//      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
-//      return runnerList;
-//    } else {
-//      return Lists.newArrayList();
-//    }
-//  }
-//
-//  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) 
throws IOException {
-//    // Parse the URI
-//    LOG.info("getLocalStoredFileChunk starts");
-//    final Map<String, List<String>> params = new 
QueryStringDecoder(fetchURI.toString()).parameters();
-//    final List<String> types = params.get("type");
-//    final List<String> qids = params.get("qid");
-//    final List<String> taskIdList = params.get("ta");
-//    final List<String> stageIds = params.get("sid");
-//    final List<String> partIds = params.get("p");
-//    final List<String> offsetList = params.get("offset");
-//    final List<String> lengthList = params.get("length");
-//
-//    if (types == null || stageIds == null || qids == null || partIds == 
null) {
-//      LOG.error("Invalid URI - Required queryId, type, stage Id, and part 
id");
-//      return null;
-//    }
-//
-//    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
-//      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and 
part id");
-//      return null;
-//    }
-//
-//    String queryId = qids.get(0);
-//    String shuffleType = types.get(0);
-//    String sid = stageIds.get(0);
-//    String partId = partIds.get(0);
-//
-//    if (shuffleType.equals("r") && taskIdList == null) {
-//      LOG.error("Invalid URI - For range shuffle, taskId is required");
-//      return null;
-//    }
-//    List<String> taskIds = splitMaps(taskIdList);
-//
-//    FileChunk chunk = null;
-//    long offset = (offsetList != null && !offsetList.isEmpty()) ? 
Long.parseLong(offsetList.get(0)) : -1L;
-//    long length = (lengthList != null && !lengthList.isEmpty()) ? 
Long.parseLong(lengthList.get(0)) : -1L;
-//
-//    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", 
sid=" + sid + ", partId=" + partId
-//     + ", taskIds=" + taskIdList);
-//
-//    // The working directory of Tajo worker for each query, including stage
-//    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
-//
-//    // If the stage requires a range shuffle
-//    if (shuffleType.equals("r")) {
-//      String ta = taskIds.get(0);
-//      if 
(!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + 
"/output/", conf)) {
-//        LOG.warn("Range shuffle - file not exist");
-//        return null;
-//      }
-//      Path path = executionBlockContext.getLocalFS().makeQualified(
-//           
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + 
ta + "/output/", conf));
-//      String startKey = params.get("start").get(0);
-//      String endKey = params.get("end").get(0);
-//      boolean last = params.get("final") != null;
-//
-//      try {
-//        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, 
last);
-//            } catch (Throwable t) {
-//        LOG.error("getFileChunks() throws exception");
-//        return null;
-//      }
-//
-//      // If the stage requires a hash shuffle or a scattered hash shuffle
-//    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-//      int partParentId = 
HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) 
conf);
-//      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" 
+ partId;
-//      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, 
conf)) {
-//        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " 
+ partPath);
-//        return null;
-//      }
-//      Path path = executionBlockContext.getLocalFS().makeQualified(
-//        
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, 
conf));
-//      File file = new File(path.toUri());
-//      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-//      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-//
-//      if (startPos >= file.length()) {
-//        LOG.error("Start pos[" + startPos + "] great than file length [" + 
file.length() + "]");
-//        return null;
-//      }
-//      chunk = new FileChunk(file, startPos, readLen);
-//
-//    } else {
-//      LOG.error("Unknown shuffle type");
-//      return null;
-//    }
-//
-//    return chunk;
-//  }
-//
-//  private List<String> splitMaps(List<String> mapq) {
-//    if (null == mapq) {
-//      return null;
-//    }
-//    final List<String> ret = new ArrayList<String>();
-//    for (String s : mapq) {
-//      Collections.addAll(ret, s.split(","));
-//    }
-//    return ret;
-//  }
-//
-//  public static Path getTaskAttemptDir(TaskAttemptId quid) {
-//    Path workDir =
-//        
StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
-//            String.valueOf(quid.getTaskId().getId()),
-//            String.valueOf(quid.getId()));
-//    return workDir;
-//  }
-//=======
-  TajoWorkerProtocol.TaskStatusProto getReport();
-=======
   TaskStatusProto getReport();
 
   TaskHistory createTaskHistory();
 
   List<Fetcher> getFetchers();
->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
index dc3ae57..eb67167 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
@@ -18,13 +18,11 @@
 
 package org.apache.tajo.ws.rs.resources;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.exception.ReturnStateUtil;
-import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
 import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.QueryInfo;
@@ -263,18 +261,10 @@ public class QueryResource {
         return ResourcesUtil.createBadRequestResponse(LOG, "Provided session 
id (" + sessionId + ") is invalid.");
       }
       
-<<<<<<< HEAD
       SubmitQueryResponse response =
-          masterContext.getGlobalEngine().executeQuery(session, 
request.getQuery(), false);
-      if (response.getResult().hasResultCode() &&
-          
ClientProtos.ResultCode.ERROR.equals(response.getResult().getResultCode())) {
-        return ResourcesUtil.createExceptionResponse(LOG, 
response.getResult().getErrorMessage());
-=======
-      SubmitQueryResponse response = 
         masterContext.getGlobalEngine().executeQuery(session, 
request.getQuery(), false);
       if (ReturnStateUtil.isError(response.getState())) {
         return ResourcesUtil.createExceptionResponse(LOG, 
response.getState().getMessage());
->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
       } else {
         JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
           
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
@@ -291,11 +281,7 @@ public class QueryResource {
           queryResponse.setUri(queryURI);
         }
 
-<<<<<<< HEAD
-        queryResponse.setResultCode(response.getResult().getResultCode());
-=======
         queryResponse.setResultCode(response.getState().getReturnCode());
->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
         queryResponse.setQuery(request.getQuery());
         return Response.status(Status.OK).entity(queryResponse).build();
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index baa8510..fa6b716 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -19,7 +19,10 @@
 package org.apache.tajo.engine.query;
 
 import com.google.common.collect.Maps;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DeflateCodec;
@@ -31,12 +34,12 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
@@ -57,7 +60,6 @@ import java.util.*;
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static 
org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class TestTablePartitions extends QueryTestCaseBase {
@@ -865,13 +867,8 @@ public class TestTablePartitions extends QueryTestCaseBase 
{
     ClientProtos.SubmitQueryResponse response = client.executeQuery("insert 
overwrite into " + tableName
         + " select l_orderkey, l_partkey from lineitem");
 
-<<<<<<< HEAD
-    assertTrue(response.getResult().hasErrorMessage());
-    assertEquals(response.getResult().getErrorMessage(), "INSERT has smaller 
expressions than target columns\n");
-=======
     assertTrue(ReturnStateUtil.isError(response.getState()));
     assertEquals(response.getState().getMessage(), "INSERT has smaller 
expressions than target columns");
->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
 
     res = executeFile("case14.sql");
     assertResultSet(res, "case14.result");
@@ -896,13 +893,8 @@ public class TestTablePartitions extends QueryTestCaseBase 
{
       response = client.executeQuery("insert overwrite into " + tableName
         + " select l_returnflag , l_orderkey, l_partkey from lineitem");
 
-<<<<<<< HEAD
-      assertTrue(response.getResult().hasErrorMessage());
-      assertEquals(response.getResult().getErrorMessage(), "INSERT has smaller 
expressions than target columns\n");
-=======
       assertTrue(ReturnStateUtil.isError(response.getState()));
       assertEquals(response.getState().getMessage(), "INSERT has smaller 
expressions than target columns");
->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
 
       res = executeFile("case15.sql");
       assertResultSet(res, "case15.result");

http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java 
b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index c7ce211..644dfc7 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -20,9 +20,9 @@ package org.apache.tajo.jdbc;
 import com.google.common.collect.Lists;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
-import org.apache.tajo.exception.SQLExceptionUtil;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.exception.SQLExceptionUtil;
 import org.apache.tajo.ipc.ClientProtos;
 
 import java.sql.*;
@@ -158,19 +158,7 @@ public class TajoStatement implements Statement {
     }
 
     ClientProtos.SubmitQueryResponse response = tajoClient.executeQuery(sql);
-<<<<<<< HEAD
-    if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) 
{
-      if (response.getResult().hasErrorMessage()) {
-        throw new ServiceException(response.getResult().getErrorMessage());
-      }
-      if (response.getResult().hasErrorTrace()) {
-        throw new ServiceException(response.getResult().getErrorTrace());
-      }
-      throw new ServiceException("Failed to submit query by unknown reason");
-    }
-=======
     SQLExceptionUtil.throwIfError(response.getState());
->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
 
     QueryId queryId = new QueryId(response.getQueryId());
     if (response.getIsForwarded() && !queryId.isNull()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
index 61cadca..77a12e8 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
@@ -34,16 +34,9 @@ import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.expr.EvalTreeUtil;
 import org.apache.tajo.plan.joinorder.*;
 import org.apache.tajo.plan.logical.*;
-<<<<<<< HEAD
-import org.apache.tajo.plan.rewrite.rules.AccessPathRewriter;
-import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule;
-import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
-import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
-import org.apache.tajo.plan.rewrite.*;
-=======
 import org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteEngine;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleProvider;
->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.util.ReflectionUtil;

Reply via email to