http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java index d2afb83,66216ee..13a2021 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@@ -48,633 -49,9 +49,641 @@@ public interface Task ExecutionBlockContext getExecutionBlockContext(); ++<<<<<<< HEAD +//<<<<<<< HEAD +// +// public TaskAttemptId getTaskId() { +// return taskId; +// } +// +// public TaskAttemptId getId() { +// return context.getTaskId(); +// } +// +// public TaskAttemptState getStatus() { +// return context.getState(); +// } +// +// public String toString() { +// return "queryId: " + this.getId() + " status: " + this.getStatus(); +// } +// +// public void setState(TaskAttemptState status) { +// context.setState(status); +// } +// +// public TaskAttemptContext getContext() { +// return context; +// } +// +// public boolean hasFetchPhase() { +// return fetcherRunners.size() > 0; +// } +// +// public List<Fetcher> getFetchers() { +// return new ArrayList<Fetcher>(fetcherRunners); +// } +// +// public void fetch() { +// ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); +// for (Fetcher f : fetcherRunners) { +// executorService.submit(new FetchRunner(context, f)); +// } +// } +// +// public void kill() { +// stopScriptExecutors(); +// context.setState(TaskAttemptState.TA_KILLED); +// context.stop(); +// } +// +// public void abort() { +// stopScriptExecutors(); +// context.stop(); +// } +// +// public void cleanUp() { +// // remove itself from worker +// if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { +// synchronized (executionBlockContext.getTasks()) { +// executionBlockContext.getTasks().remove(this.getId()); +// } +// } else { +// LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); +// } +// } +// +// public TaskStatusProto getReport() { +// TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); +// builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); +// builder.setId(context.getTaskId().getProto()) +// .setProgress(context.getProgress()) +// .setState(context.getState()); +// +// builder.setInputStats(reloadInputStats()); +// +// if (context.getResultStats() != null) { +// builder.setResultStats(context.getResultStats().getProto()); +// } +// return builder.build(); +// } +// +// public boolean isRunning(){ +// return context.getState() == TaskAttemptState.TA_RUNNING; +// } +// +// public boolean isProgressChanged() { +// return context.isProgressChanged(); +// } +// +// public void updateProgress() { +// if(context != null && context.isStopped()){ +// return; +// } +// +// if (executor != null && context.getProgress() < 1.0f) { +// context.setExecutorProgress(executor.getProgress()); +// } +// } +// +// private CatalogProtos.TableStatsProto reloadInputStats() { +// synchronized(inputStats) { +// if (this.executor == null) { +// return inputStats.getProto(); +// } +// +// TableStats executorInputStats = this.executor.getInputStats(); +// +// if (executorInputStats != null) { +// inputStats.setValues(executorInputStats); +// } +// return inputStats.getProto(); +// } +// } +// +// private TaskCompletionReport getTaskCompletionReport() { +// TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); +// builder.setId(context.getTaskId().getProto()); +// +// builder.setInputStats(reloadInputStats()); +// +// if (context.hasResultStats()) { +// builder.setResultStats(context.getResultStats().getProto()); +// } else { +// builder.setResultStats(new TableStats().getProto()); +// } +// +// Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); +// if (it.hasNext()) { +// do { +// Entry<Integer, String> entry = it.next(); +// ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); +// part.setPartId(entry.getKey()); +// +// // Set output volume +// if (context.getPartitionOutputVolume() != null) { +// for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { +// if (entry.getKey().equals(e.getKey())) { +// part.setVolume(e.getValue().longValue()); +// break; +// } +// } +// } +// +// builder.addShuffleFileOutputs(part.build()); +// } while (it.hasNext()); +// } +// +// return builder.build(); +// } +// +// private void waitForFetch() throws InterruptedException, IOException { +// context.getFetchLatch().await(); +// LOG.info(context.getTaskId() + " All fetches are done!"); +// Collection<String> inputs = Lists.newArrayList(context.getInputTables()); +// +// // Get all broadcasted tables +// Set<String> broadcastTableNames = new HashSet<String>(); +// List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); +// if (broadcasts != null) { +// for (EnforceProperty eachBroadcast : broadcasts) { +// broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); +// } +// } +// +// // localize the fetched data and skip the broadcast table +// for (String inputTable: inputs) { +// if (broadcastTableNames.contains(inputTable)) { +// continue; +// } +// File tableDir = new File(context.getFetchIn(), inputTable); +// FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); +// context.updateAssignedFragments(inputTable, frags); +// } +// } +// +// public void run() throws Exception { +// startTime = System.currentTimeMillis(); +// Throwable error = null; +// try { +// if(!context.isStopped()) { +// context.setState(TaskAttemptState.TA_RUNNING); +// if (context.hasFetchPhase()) { +// // If the fetch is still in progress, the query unit must wait for +// // complete. +// waitForFetch(); +// context.setFetcherProgress(FETCHER_PROGRESS); +// context.setProgressChanged(true); +// updateProgress(); +// } +// +// this.executor = executionBlockContext.getTQueryEngine(). +// createPlan(context, plan); +// this.executor.init(); +// +// while(!context.isStopped() && executor.next() != null) { +// } +// } +// } catch (Throwable e) { +// error = e ; +// LOG.error(e.getMessage(), e); +// stopScriptExecutors(); +// context.stop(); +// } finally { +// if (executor != null) { +// try { +// executor.close(); +// reloadInputStats(); +// } catch (IOException e) { +// LOG.error(e, e); +// } +// this.executor = null; +// } +// +// executionBlockContext.completedTasksNum.incrementAndGet(); +// context.getHashShuffleAppenderManager().finalizeTask(taskId); +// +// QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); +// if (context.isStopped()) { +// context.setExecutorProgress(0.0f); +// +// if (context.getState() == TaskAttemptState.TA_KILLED) { +// queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); +// executionBlockContext.killedTasksNum.incrementAndGet(); +// } else { +// context.setState(TaskAttemptState.TA_FAILED); +// TaskFatalErrorReport.Builder errorBuilder = +// TaskFatalErrorReport.newBuilder() +// .setId(getId().getProto()); +// if (error != null) { +// if (error.getMessage() == null) { +// errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); +// } else { +// errorBuilder.setErrorMessage(error.getMessage()); +// } +// errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); +// } +// +// queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); +// executionBlockContext.failedTasksNum.incrementAndGet(); +// } +// } else { +// // if successful +// context.setProgress(1.0f); +// context.setState(TaskAttemptState.TA_SUCCEEDED); +// executionBlockContext.succeededTasksNum.incrementAndGet(); +// +// TaskCompletionReport report = getTaskCompletionReport(); +// queryMasterStub.done(null, report, NullCallback.get()); +// } +// finishTime = System.currentTimeMillis(); +// LOG.info(context.getTaskId() + " completed. " + +// "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + +// ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() +// + ", killed: " + executionBlockContext.killedTasksNum.intValue() +// + ", failed: " + executionBlockContext.failedTasksNum.intValue()); +// cleanupTask(); +// } +// } +// +// public void cleanupTask() { +// TaskHistory taskHistory = createTaskHistory(); +// executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); +// executionBlockContext.getTasks().remove(getId()); +// +// fetcherRunners.clear(); +// fetcherRunners = null; +// try { +// if(executor != null) { +// executor.close(); +// executor = null; +// } +// } catch (IOException e) { +// LOG.fatal(e.getMessage(), e); +// } +// +// executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); +// stopScriptExecutors(); +// } +// +// public TaskHistory createTaskHistory() { +// TaskHistory taskHistory = null; +// try { +// taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(), +// startTime, finishTime, reloadInputStats()); +// +// if (context.getOutputPath() != null) { +// taskHistory.setOutputPath(context.getOutputPath().toString()); +// } +// +// if (context.getWorkDir() != null) { +// taskHistory.setWorkingPath(context.getWorkDir().toString()); +// } +// +// if (context.getResultStats() != null) { +// taskHistory.setOutputStats(context.getResultStats().getProto()); +// } +// +// if (hasFetchPhase()) { +// taskHistory.setTotalFetchCount(fetcherRunners.size()); +// int i = 0; +// FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); +// for (Fetcher fetcher : fetcherRunners) { +// // TODO store the fetcher histories +// if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { +// builder.setStartTime(fetcher.getStartTime()); +// builder.setFinishTime(fetcher.getFinishTime()); +// builder.setFileLength(fetcher.getFileLen()); +// builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); +// builder.setState(fetcher.getState()); +// +// taskHistory.addFetcherHistory(builder.build()); +// } +// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; +// } +// taskHistory.setFinishedFetchCount(i); +// } +// } catch (Exception e) { +// LOG.warn(e.getMessage(), e); +// } +// +// return taskHistory; +// } +// +// public int hashCode() { +// return context.hashCode(); +// } +// +// public boolean equals(Object obj) { +// if (obj instanceof Task) { +// Task other = (Task) obj; +// return this.context.equals(other.context); +// } +// return false; +// } +// +// private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) +// throws IOException { +// Configuration c = new Configuration(systemConf); +// c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); +// FileSystem fs = FileSystem.get(c); +// Path tablePath = new Path(file.getAbsolutePath()); +// +// List<FileFragment> listTablets = new ArrayList<FileFragment>(); +// FileFragment tablet; +// +// FileStatus[] fileLists = fs.listStatus(tablePath); +// for (FileStatus f : fileLists) { +// if (f.getLen() == 0) { +// continue; +// } +// tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); +// listTablets.add(tablet); +// } +// +// // Special treatment for locally pseudo fetched chunks +// synchronized (localChunks) { +// for (FileChunk chunk : localChunks) { +// if (name.equals(chunk.getEbId())) { +// tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); +// listTablets.add(tablet); +// LOG.info("One local chunk is added to listTablets"); +// } +// } +// } +// +// FileFragment[] tablets = new FileFragment[listTablets.size()]; +// listTablets.toArray(tablets); +// +// return tablets; +// } +// +// private class FetchRunner implements Runnable { +// private final TaskAttemptContext ctx; +// private final Fetcher fetcher; +// private int maxRetryNum; +// +// public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { +// this.ctx = ctx; +// this.fetcher = fetcher; +// this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); +// } +// +// @Override +// public void run() { +// int retryNum = 0; +// int retryWaitTime = 1000; //sec +// +// try { // for releasing fetch latch +// while(!context.isStopped() && retryNum < maxRetryNum) { +// if (retryNum > 0) { +// try { +// Thread.sleep(retryWaitTime); +// retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds +// } catch (InterruptedException e) { +// LOG.error(e); +// } +// LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); +// } +// try { +// FileChunk fetched = fetcher.get(); +// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null +// && fetched.getFile() != null) { +// if (fetched.fromRemote() == false) { +// localChunks.add(fetched); +// LOG.info("Add a new FileChunk to local chunk list"); +// } +// break; +// } +// } catch (Throwable e) { +// LOG.error("Fetch failed: " + fetcher.getURI(), e); +// } +// retryNum++; +// } +// } finally { +// if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ +// fetcherFinished(ctx); +// } else { +// if (retryNum == maxRetryNum) { +// LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); +// } +// stopScriptExecutors(); +// context.stop(); // retry task +// ctx.getFetchLatch().countDown(); +// } +// } +// } +// } +// +// @VisibleForTesting +// public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { +// if (totalFetcher > 0) { +// return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; +// } else { +// return 0.0f; +// } +// } +// +// private synchronized void fetcherFinished(TaskAttemptContext ctx) { +// int fetcherSize = fetcherRunners.size(); +// if(fetcherSize == 0) { +// return; +// } +// +// ctx.getFetchLatch().countDown(); +// +// int remainFetcher = (int) ctx.getFetchLatch().getCount(); +// if (remainFetcher == 0) { +// context.setFetcherProgress(FETCHER_PROGRESS); +// } else { +// context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); +// context.setProgressChanged(true); +// } +// } +// +// private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, +// List<FetchImpl> fetches) throws IOException { +// +// if (fetches.size() > 0) { +// Path inputDir = executionBlockContext.getLocalDirAllocator(). +// getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); +// +// int i = 0; +// File storeDir; +// File defaultStoreFile; +// FileChunk storeChunk = null; +// List<Fetcher> runnerList = Lists.newArrayList(); +// +// for (FetchImpl f : fetches) { +// storeDir = new File(inputDir.toString(), f.getName()); +// if (!storeDir.exists()) { +// storeDir.mkdirs(); +// } +// +// for (URI uri : f.getURIs()) { +// defaultStoreFile = new File(storeDir, "in_" + i); +// InetAddress address = InetAddress.getByName(uri.getHost()); +// +// WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); +// if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { +// boolean hasError = false; +// try { +// LOG.info("Try to get local file chunk at local host"); +// storeChunk = getLocalStoredFileChunk(uri, systemConf); +// } catch (Throwable t) { +// hasError = true; +// } +// +// // When a range request is out of range, storeChunk will be NULL. This case is normal state. +// // So, we should skip and don't need to create storeChunk. +// if (storeChunk == null && !hasError) { +// continue; +// } +// +// if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 +// && hasError == false) { +// storeChunk.setFromRemote(false); +// } else { +// storeChunk = new FileChunk(defaultStoreFile, 0, -1); +// storeChunk.setFromRemote(true); +// } +// } else { +// storeChunk = new FileChunk(defaultStoreFile, 0, -1); +// storeChunk.setFromRemote(true); +// } +// +// // If we decide that intermediate data should be really fetched from a remote host, storeChunk +// // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it +// storeChunk.setEbId(f.getName()); +// Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); +// LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); +// runnerList.add(fetcher); +// i++; +// } +// } +// ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); +// return runnerList; +// } else { +// return Lists.newArrayList(); +// } +// } +// +// private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { +// // Parse the URI +// LOG.info("getLocalStoredFileChunk starts"); +// final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); +// final List<String> types = params.get("type"); +// final List<String> qids = params.get("qid"); +// final List<String> taskIdList = params.get("ta"); +// final List<String> stageIds = params.get("sid"); +// final List<String> partIds = params.get("p"); +// final List<String> offsetList = params.get("offset"); +// final List<String> lengthList = params.get("length"); +// +// if (types == null || stageIds == null || qids == null || partIds == null) { +// LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); +// return null; +// } +// +// if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { +// LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); +// return null; +// } +// +// String queryId = qids.get(0); +// String shuffleType = types.get(0); +// String sid = stageIds.get(0); +// String partId = partIds.get(0); +// +// if (shuffleType.equals("r") && taskIdList == null) { +// LOG.error("Invalid URI - For range shuffle, taskId is required"); +// return null; +// } +// List<String> taskIds = splitMaps(taskIdList); +// +// FileChunk chunk = null; +// long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; +// long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; +// +// LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId +// + ", taskIds=" + taskIdList); +// +// // The working directory of Tajo worker for each query, including stage +// String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; +// +// // If the stage requires a range shuffle +// if (shuffleType.equals("r")) { +// String ta = taskIds.get(0); +// if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { +// LOG.warn("Range shuffle - file not exist"); +// return null; +// } +// Path path = executionBlockContext.getLocalFS().makeQualified( +// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); +// String startKey = params.get("start").get(0); +// String endKey = params.get("end").get(0); +// boolean last = params.get("final") != null; +// +// try { +// chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); +// } catch (Throwable t) { +// LOG.error("getFileChunks() throws exception"); +// return null; +// } +// +// // If the stage requires a hash shuffle or a scattered hash shuffle +// } else if (shuffleType.equals("h") || shuffleType.equals("s")) { +// int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); +// String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; +// if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { +// LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); +// return null; +// } +// Path path = executionBlockContext.getLocalFS().makeQualified( +// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); +// File file = new File(path.toUri()); +// long startPos = (offset >= 0 && length >= 0) ? offset : 0; +// long readLen = (offset >= 0 && length >= 0) ? length : file.length(); +// +// if (startPos >= file.length()) { +// LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); +// return null; +// } +// chunk = new FileChunk(file, startPos, readLen); +// +// } else { +// LOG.error("Unknown shuffle type"); +// return null; +// } +// +// return chunk; +// } +// +// private List<String> splitMaps(List<String> mapq) { +// if (null == mapq) { +// return null; +// } +// final List<String> ret = new ArrayList<String>(); +// for (String s : mapq) { +// Collections.addAll(ret, s.split(",")); +// } +// return ret; +// } +// +// public static Path getTaskAttemptDir(TaskAttemptId quid) { +// Path workDir = +// StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), +// String.valueOf(quid.getTaskId().getId()), +// String.valueOf(quid.getId())); +// return workDir; +// } +//======= + TajoWorkerProtocol.TaskStatusProto getReport(); ++======= + TaskStatusProto getReport(); + + TaskHistory createTaskHistory(); + + List<Fetcher> getFetchers(); ++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 }
http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java index 2063c61,f91288d..dc3ae57 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java @@@ -262,11 -263,10 +263,18 @@@ public class QueryResource return ResourcesUtil.createBadRequestResponse(LOG, "Provided session id (" + sessionId + ") is invalid."); } ++<<<<<<< HEAD + SubmitQueryResponse response = + masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false); + if (response.getResult().hasResultCode() && + ClientProtos.ResultCode.ERROR.equals(response.getResult().getResultCode())) { + return ResourcesUtil.createExceptionResponse(LOG, response.getResult().getErrorMessage()); ++======= + SubmitQueryResponse response = + masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false); + if (ReturnStateUtil.isError(response.getState())) { + return ResourcesUtil.createExceptionResponse(LOG, response.getState().getMessage()); ++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 } else { JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); @@@ -283,7 -283,7 +291,11 @@@ queryResponse.setUri(queryURI); } ++<<<<<<< HEAD + queryResponse.setResultCode(response.getResult().getResultCode()); ++======= + queryResponse.setResultCode(response.getState().getReturnCode()); ++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 queryResponse.setQuery(request.getQuery()); return Response.status(Status.OK).entity(queryResponse).build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --cc tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index cf8a598,77bfca6..baa8510 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@@ -864,8 -865,8 +865,13 @@@ public class TestTablePartitions extend ClientProtos.SubmitQueryResponse response = client.executeQuery("insert overwrite into " + tableName + " select l_orderkey, l_partkey from lineitem"); ++<<<<<<< HEAD + assertTrue(response.getResult().hasErrorMessage()); + assertEquals(response.getResult().getErrorMessage(), "INSERT has smaller expressions than target columns\n"); ++======= + assertTrue(ReturnStateUtil.isError(response.getState())); + assertEquals(response.getState().getMessage(), "INSERT has smaller expressions than target columns"); ++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 res = executeFile("case14.sql"); assertResultSet(res, "case14.result"); @@@ -890,8 -891,8 +896,13 @@@ response = client.executeQuery("insert overwrite into " + tableName + " select l_returnflag , l_orderkey, l_partkey from lineitem"); ++<<<<<<< HEAD + assertTrue(response.getResult().hasErrorMessage()); + assertEquals(response.getResult().getErrorMessage(), "INSERT has smaller expressions than target columns\n"); ++======= + assertTrue(ReturnStateUtil.isError(response.getState())); + assertEquals(response.getState().getMessage(), "INSERT has smaller expressions than target columns"); ++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 res = executeFile("case15.sql"); assertResultSet(res, "case15.result"); http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java ---------------------------------------------------------------------- diff --cc tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java index e1242a9,60f7ca3..c7ce211 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java @@@ -163,15 -158,7 +158,19 @@@ public class TajoStatement implements S } ClientProtos.SubmitQueryResponse response = tajoClient.executeQuery(sql); ++<<<<<<< HEAD + if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { + if (response.getResult().hasErrorMessage()) { + throw new ServiceException(response.getResult().getErrorMessage()); + } + if (response.getResult().hasErrorTrace()) { + throw new ServiceException(response.getResult().getErrorTrace()); + } + throw new ServiceException("Failed to submit query by unknown reason"); + } ++======= + SQLExceptionUtil.throwIfError(response.getState()); ++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 QueryId queryId = new QueryId(response.getQueryId()); if (response.getIsForwarded() && !queryId.isNull()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java index ed16f82,2760722..61cadca --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java @@@ -28,29 -26,22 +26,31 @@@ import org.apache.tajo.ConfigKey import org.apache.tajo.OverridableConf; import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; - import org.apache.tajo.util.ReflectionUtil; - import org.apache.tajo.util.graph.DirectedGraphCursor; import org.apache.tajo.plan.expr.AlgebraicUtil; import org.apache.tajo.plan.expr.EvalNode; - import org.apache.tajo.plan.joinorder.FoundJoinOrder; - import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm; - import org.apache.tajo.plan.joinorder.JoinGraph; - import org.apache.tajo.plan.joinorder.JoinOrderAlgorithm; + import org.apache.tajo.plan.expr.EvalTreeUtil; + import org.apache.tajo.plan.joinorder.*; import org.apache.tajo.plan.logical.*; ++<<<<<<< HEAD +import org.apache.tajo.plan.rewrite.rules.AccessPathRewriter; +import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; +import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule; +import org.apache.tajo.plan.rewrite.*; ++======= + import org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteEngine; + import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleProvider; ++>>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; + import org.apache.tajo.util.ReflectionUtil; + import org.apache.tajo.util.TUtil; + import org.apache.tajo.util.graph.DirectedGraphCursor; - import java.util.LinkedHashSet; - import java.util.Set; - import java.util.Stack; + import java.util.*; import static org.apache.tajo.plan.LogicalPlan.BlockEdge; import static org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm.getCost; @@@ -105,9 -87,9 +99,9 @@@ public class LogicalOptimizer optimizeJoinOrder(plan, blockCursor.nextBlock()); } } else { - LOG.info("Skip Join Optimized."); + LOG.info("Skip join order optimization"); } - rulesAfterToJoinOpt.rewrite(context, plan); + rulesAfterToJoinOpt.rewrite(new LogicalPlanRewriteRuleContext(context, plan, catalog)); return plan.getRootBlock().getRoot(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index 5a7ea2b,ef8663d..d10950a --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@@ -35,12 -35,12 +35,14 @@@ import org.apache.tajo.SessionVars import org.apache.tajo.algebra.*; import org.apache.tajo.algebra.WindowSpec; import org.apache.tajo.catalog.*; + import org.apache.tajo.catalog.exception.UndefinedColumnException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod; import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.NullDatum; + import org.apache.tajo.exception.ExceptionUtil; import org.apache.tajo.plan.LogicalPlan.QueryBlock; import org.apache.tajo.plan.algebra.BaseAlgebraVisitor; import org.apache.tajo.plan.expr.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0028d7aa/tajo-plan/src/main/proto/Plan.proto ----------------------------------------------------------------------
