http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 13a2021..66216ee 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -49,641 +49,9 @@ public interface Task { ExecutionBlockContext getExecutionBlockContext(); -<<<<<<< HEAD -//<<<<<<< HEAD -// -// public TaskAttemptId getTaskId() { -// return taskId; -// } -// -// public TaskAttemptId getId() { -// return context.getTaskId(); -// } -// -// public TaskAttemptState getStatus() { -// return context.getState(); -// } -// -// public String toString() { -// return "queryId: " + this.getId() + " status: " + this.getStatus(); -// } -// -// public void setState(TaskAttemptState status) { -// context.setState(status); -// } -// -// public TaskAttemptContext getContext() { -// return context; -// } -// -// public boolean hasFetchPhase() { -// return fetcherRunners.size() > 0; -// } -// -// public List<Fetcher> getFetchers() { -// return new ArrayList<Fetcher>(fetcherRunners); -// } -// -// public void fetch() { -// ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); -// for (Fetcher f : fetcherRunners) { -// executorService.submit(new FetchRunner(context, f)); -// } -// } -// -// public void kill() { -// stopScriptExecutors(); -// context.setState(TaskAttemptState.TA_KILLED); -// context.stop(); -// } -// -// public void abort() { -// stopScriptExecutors(); -// context.stop(); -// } -// -// public void cleanUp() { -// // remove itself from worker -// if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { -// synchronized (executionBlockContext.getTasks()) { -// executionBlockContext.getTasks().remove(this.getId()); -// } -// } else { -// LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); -// } -// } -// -// public TaskStatusProto getReport() { -// TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); -// builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); -// builder.setId(context.getTaskId().getProto()) -// .setProgress(context.getProgress()) -// .setState(context.getState()); -// -// builder.setInputStats(reloadInputStats()); -// -// if (context.getResultStats() != null) { -// builder.setResultStats(context.getResultStats().getProto()); -// } -// return builder.build(); -// } -// -// public boolean isRunning(){ -// return context.getState() == TaskAttemptState.TA_RUNNING; -// } -// -// public boolean isProgressChanged() { -// return context.isProgressChanged(); -// } -// -// public void updateProgress() { -// if(context != null && context.isStopped()){ -// return; -// } -// -// if (executor != null && context.getProgress() < 1.0f) { -// context.setExecutorProgress(executor.getProgress()); -// } -// } -// -// private CatalogProtos.TableStatsProto reloadInputStats() { -// synchronized(inputStats) { -// if (this.executor == null) { -// return inputStats.getProto(); -// } -// -// TableStats executorInputStats = this.executor.getInputStats(); -// -// if (executorInputStats != null) { -// inputStats.setValues(executorInputStats); -// } -// return inputStats.getProto(); -// } -// } -// -// private TaskCompletionReport getTaskCompletionReport() { -// TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); -// builder.setId(context.getTaskId().getProto()); -// -// builder.setInputStats(reloadInputStats()); -// -// if (context.hasResultStats()) { -// builder.setResultStats(context.getResultStats().getProto()); -// } else { -// builder.setResultStats(new TableStats().getProto()); -// } -// -// Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); -// if (it.hasNext()) { -// do { -// Entry<Integer, String> entry = it.next(); -// ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); -// part.setPartId(entry.getKey()); -// -// // Set output volume -// if (context.getPartitionOutputVolume() != null) { -// for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { -// if (entry.getKey().equals(e.getKey())) { -// part.setVolume(e.getValue().longValue()); -// break; -// } -// } -// } -// -// builder.addShuffleFileOutputs(part.build()); -// } while (it.hasNext()); -// } -// -// return builder.build(); -// } -// -// private void waitForFetch() throws InterruptedException, IOException { -// context.getFetchLatch().await(); -// LOG.info(context.getTaskId() + " All fetches are done!"); -// Collection<String> inputs = Lists.newArrayList(context.getInputTables()); -// -// // Get all broadcasted tables -// Set<String> broadcastTableNames = new HashSet<String>(); -// List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); -// if (broadcasts != null) { -// for (EnforceProperty eachBroadcast : broadcasts) { -// broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); -// } -// } -// -// // localize the fetched data and skip the broadcast table -// for (String inputTable: inputs) { -// if (broadcastTableNames.contains(inputTable)) { -// continue; -// } -// File tableDir = new File(context.getFetchIn(), inputTable); -// FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); -// context.updateAssignedFragments(inputTable, frags); -// } -// } -// -// public void run() throws Exception { -// startTime = System.currentTimeMillis(); -// Throwable error = null; -// try { -// if(!context.isStopped()) { -// context.setState(TaskAttemptState.TA_RUNNING); -// if (context.hasFetchPhase()) { -// // If the fetch is still in progress, the query unit must wait for -// // complete. -// waitForFetch(); -// context.setFetcherProgress(FETCHER_PROGRESS); -// context.setProgressChanged(true); -// updateProgress(); -// } -// -// this.executor = executionBlockContext.getTQueryEngine(). -// createPlan(context, plan); -// this.executor.init(); -// -// while(!context.isStopped() && executor.next() != null) { -// } -// } -// } catch (Throwable e) { -// error = e ; -// LOG.error(e.getMessage(), e); -// stopScriptExecutors(); -// context.stop(); -// } finally { -// if (executor != null) { -// try { -// executor.close(); -// reloadInputStats(); -// } catch (IOException e) { -// LOG.error(e, e); -// } -// this.executor = null; -// } -// -// executionBlockContext.completedTasksNum.incrementAndGet(); -// context.getHashShuffleAppenderManager().finalizeTask(taskId); -// -// QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); -// if (context.isStopped()) { -// context.setExecutorProgress(0.0f); -// -// if (context.getState() == TaskAttemptState.TA_KILLED) { -// queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); -// executionBlockContext.killedTasksNum.incrementAndGet(); -// } else { -// context.setState(TaskAttemptState.TA_FAILED); -// TaskFatalErrorReport.Builder errorBuilder = -// TaskFatalErrorReport.newBuilder() -// .setId(getId().getProto()); -// if (error != null) { -// if (error.getMessage() == null) { -// errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); -// } else { -// errorBuilder.setErrorMessage(error.getMessage()); -// } -// errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); -// } -// -// queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); -// executionBlockContext.failedTasksNum.incrementAndGet(); -// } -// } else { -// // if successful -// context.setProgress(1.0f); -// context.setState(TaskAttemptState.TA_SUCCEEDED); -// executionBlockContext.succeededTasksNum.incrementAndGet(); -// -// TaskCompletionReport report = getTaskCompletionReport(); -// queryMasterStub.done(null, report, NullCallback.get()); -// } -// finishTime = System.currentTimeMillis(); -// LOG.info(context.getTaskId() + " completed. " + -// "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + -// ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() -// + ", killed: " + executionBlockContext.killedTasksNum.intValue() -// + ", failed: " + executionBlockContext.failedTasksNum.intValue()); -// cleanupTask(); -// } -// } -// -// public void cleanupTask() { -// TaskHistory taskHistory = createTaskHistory(); -// executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); -// executionBlockContext.getTasks().remove(getId()); -// -// fetcherRunners.clear(); -// fetcherRunners = null; -// try { -// if(executor != null) { -// executor.close(); -// executor = null; -// } -// } catch (IOException e) { -// LOG.fatal(e.getMessage(), e); -// } -// -// executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); -// stopScriptExecutors(); -// } -// -// public TaskHistory createTaskHistory() { -// TaskHistory taskHistory = null; -// try { -// taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(), -// startTime, finishTime, reloadInputStats()); -// -// if (context.getOutputPath() != null) { -// taskHistory.setOutputPath(context.getOutputPath().toString()); -// } -// -// if (context.getWorkDir() != null) { -// taskHistory.setWorkingPath(context.getWorkDir().toString()); -// } -// -// if (context.getResultStats() != null) { -// taskHistory.setOutputStats(context.getResultStats().getProto()); -// } -// -// if (hasFetchPhase()) { -// taskHistory.setTotalFetchCount(fetcherRunners.size()); -// int i = 0; -// FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); -// for (Fetcher fetcher : fetcherRunners) { -// // TODO store the fetcher histories -// if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { -// builder.setStartTime(fetcher.getStartTime()); -// builder.setFinishTime(fetcher.getFinishTime()); -// builder.setFileLength(fetcher.getFileLen()); -// builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); -// builder.setState(fetcher.getState()); -// -// taskHistory.addFetcherHistory(builder.build()); -// } -// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; -// } -// taskHistory.setFinishedFetchCount(i); -// } -// } catch (Exception e) { -// LOG.warn(e.getMessage(), e); -// } -// -// return taskHistory; -// } -// -// public int hashCode() { -// return context.hashCode(); -// } -// -// public boolean equals(Object obj) { -// if (obj instanceof Task) { -// Task other = (Task) obj; -// return this.context.equals(other.context); -// } -// return false; -// } -// -// private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) -// throws IOException { -// Configuration c = new Configuration(systemConf); -// c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); -// FileSystem fs = FileSystem.get(c); -// Path tablePath = new Path(file.getAbsolutePath()); -// -// List<FileFragment> listTablets = new ArrayList<FileFragment>(); -// FileFragment tablet; -// -// FileStatus[] fileLists = fs.listStatus(tablePath); -// for (FileStatus f : fileLists) { -// if (f.getLen() == 0) { -// continue; -// } -// tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); -// listTablets.add(tablet); -// } -// -// // Special treatment for locally pseudo fetched chunks -// synchronized (localChunks) { -// for (FileChunk chunk : localChunks) { -// if (name.equals(chunk.getEbId())) { -// tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); -// listTablets.add(tablet); -// LOG.info("One local chunk is added to listTablets"); -// } -// } -// } -// -// FileFragment[] tablets = new FileFragment[listTablets.size()]; -// listTablets.toArray(tablets); -// -// return tablets; -// } -// -// private class FetchRunner implements Runnable { -// private final TaskAttemptContext ctx; -// private final Fetcher fetcher; -// private int maxRetryNum; -// -// public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { -// this.ctx = ctx; -// this.fetcher = fetcher; -// this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); -// } -// -// @Override -// public void run() { -// int retryNum = 0; -// int retryWaitTime = 1000; //sec -// -// try { // for releasing fetch latch -// while(!context.isStopped() && retryNum < maxRetryNum) { -// if (retryNum > 0) { -// try { -// Thread.sleep(retryWaitTime); -// retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds -// } catch (InterruptedException e) { -// LOG.error(e); -// } -// LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); -// } -// try { -// FileChunk fetched = fetcher.get(); -// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null -// && fetched.getFile() != null) { -// if (fetched.fromRemote() == false) { -// localChunks.add(fetched); -// LOG.info("Add a new FileChunk to local chunk list"); -// } -// break; -// } -// } catch (Throwable e) { -// LOG.error("Fetch failed: " + fetcher.getURI(), e); -// } -// retryNum++; -// } -// } finally { -// if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ -// fetcherFinished(ctx); -// } else { -// if (retryNum == maxRetryNum) { -// LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); -// } -// stopScriptExecutors(); -// context.stop(); // retry task -// ctx.getFetchLatch().countDown(); -// } -// } -// } -// } -// -// @VisibleForTesting -// public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { -// if (totalFetcher > 0) { -// return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; -// } else { -// return 0.0f; -// } -// } -// -// private synchronized void fetcherFinished(TaskAttemptContext ctx) { -// int fetcherSize = fetcherRunners.size(); -// if(fetcherSize == 0) { -// return; -// } -// -// ctx.getFetchLatch().countDown(); -// -// int remainFetcher = (int) ctx.getFetchLatch().getCount(); -// if (remainFetcher == 0) { -// context.setFetcherProgress(FETCHER_PROGRESS); -// } else { -// context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); -// context.setProgressChanged(true); -// } -// } -// -// private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, -// List<FetchImpl> fetches) throws IOException { -// -// if (fetches.size() > 0) { -// Path inputDir = executionBlockContext.getLocalDirAllocator(). -// getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); -// -// int i = 0; -// File storeDir; -// File defaultStoreFile; -// FileChunk storeChunk = null; -// List<Fetcher> runnerList = Lists.newArrayList(); -// -// for (FetchImpl f : fetches) { -// storeDir = new File(inputDir.toString(), f.getName()); -// if (!storeDir.exists()) { -// storeDir.mkdirs(); -// } -// -// for (URI uri : f.getURIs()) { -// defaultStoreFile = new File(storeDir, "in_" + i); -// InetAddress address = InetAddress.getByName(uri.getHost()); -// -// WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); -// if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { -// boolean hasError = false; -// try { -// LOG.info("Try to get local file chunk at local host"); -// storeChunk = getLocalStoredFileChunk(uri, systemConf); -// } catch (Throwable t) { -// hasError = true; -// } -// -// // When a range request is out of range, storeChunk will be NULL. This case is normal state. -// // So, we should skip and don't need to create storeChunk. -// if (storeChunk == null && !hasError) { -// continue; -// } -// -// if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 -// && hasError == false) { -// storeChunk.setFromRemote(false); -// } else { -// storeChunk = new FileChunk(defaultStoreFile, 0, -1); -// storeChunk.setFromRemote(true); -// } -// } else { -// storeChunk = new FileChunk(defaultStoreFile, 0, -1); -// storeChunk.setFromRemote(true); -// } -// -// // If we decide that intermediate data should be really fetched from a remote host, storeChunk -// // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it -// storeChunk.setEbId(f.getName()); -// Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); -// LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); -// runnerList.add(fetcher); -// i++; -// } -// } -// ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); -// return runnerList; -// } else { -// return Lists.newArrayList(); -// } -// } -// -// private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { -// // Parse the URI -// LOG.info("getLocalStoredFileChunk starts"); -// final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); -// final List<String> types = params.get("type"); -// final List<String> qids = params.get("qid"); -// final List<String> taskIdList = params.get("ta"); -// final List<String> stageIds = params.get("sid"); -// final List<String> partIds = params.get("p"); -// final List<String> offsetList = params.get("offset"); -// final List<String> lengthList = params.get("length"); -// -// if (types == null || stageIds == null || qids == null || partIds == null) { -// LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); -// return null; -// } -// -// if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { -// LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); -// return null; -// } -// -// String queryId = qids.get(0); -// String shuffleType = types.get(0); -// String sid = stageIds.get(0); -// String partId = partIds.get(0); -// -// if (shuffleType.equals("r") && taskIdList == null) { -// LOG.error("Invalid URI - For range shuffle, taskId is required"); -// return null; -// } -// List<String> taskIds = splitMaps(taskIdList); -// -// FileChunk chunk = null; -// long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; -// long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; -// -// LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId -// + ", taskIds=" + taskIdList); -// -// // The working directory of Tajo worker for each query, including stage -// String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; -// -// // If the stage requires a range shuffle -// if (shuffleType.equals("r")) { -// String ta = taskIds.get(0); -// if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { -// LOG.warn("Range shuffle - file not exist"); -// return null; -// } -// Path path = executionBlockContext.getLocalFS().makeQualified( -// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); -// String startKey = params.get("start").get(0); -// String endKey = params.get("end").get(0); -// boolean last = params.get("final") != null; -// -// try { -// chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); -// } catch (Throwable t) { -// LOG.error("getFileChunks() throws exception"); -// return null; -// } -// -// // If the stage requires a hash shuffle or a scattered hash shuffle -// } else if (shuffleType.equals("h") || shuffleType.equals("s")) { -// int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); -// String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; -// if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { -// LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); -// return null; -// } -// Path path = executionBlockContext.getLocalFS().makeQualified( -// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); -// File file = new File(path.toUri()); -// long startPos = (offset >= 0 && length >= 0) ? offset : 0; -// long readLen = (offset >= 0 && length >= 0) ? length : file.length(); -// -// if (startPos >= file.length()) { -// LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); -// return null; -// } -// chunk = new FileChunk(file, startPos, readLen); -// -// } else { -// LOG.error("Unknown shuffle type"); -// return null; -// } -// -// return chunk; -// } -// -// private List<String> splitMaps(List<String> mapq) { -// if (null == mapq) { -// return null; -// } -// final List<String> ret = new ArrayList<String>(); -// for (String s : mapq) { -// Collections.addAll(ret, s.split(",")); -// } -// return ret; -// } -// -// public static Path getTaskAttemptDir(TaskAttemptId quid) { -// Path workDir = -// StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), -// String.valueOf(quid.getTaskId().getId()), -// String.valueOf(quid.getId())); -// return workDir; -// } -//======= - TajoWorkerProtocol.TaskStatusProto getReport(); -======= TaskStatusProto getReport(); TaskHistory createTaskHistory(); List<Fetcher> getFetchers(); ->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 }
http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java index dc3ae57..eb67167 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java @@ -18,13 +18,11 @@ package org.apache.tajo.ws.rs.resources; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.exception.ReturnStateUtil; -import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.QueryInfo; @@ -263,18 +261,10 @@ public class QueryResource { return ResourcesUtil.createBadRequestResponse(LOG, "Provided session id (" + sessionId + ") is invalid."); } -<<<<<<< HEAD SubmitQueryResponse response = - masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false); - if (response.getResult().hasResultCode() && - ClientProtos.ResultCode.ERROR.equals(response.getResult().getResultCode())) { - return ResourcesUtil.createExceptionResponse(LOG, response.getResult().getErrorMessage()); -======= - SubmitQueryResponse response = masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false); if (ReturnStateUtil.isError(response.getState())) { return ResourcesUtil.createExceptionResponse(LOG, response.getState().getMessage()); ->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 } else { JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); @@ -291,11 +281,7 @@ public class QueryResource { queryResponse.setUri(queryURI); } -<<<<<<< HEAD - queryResponse.setResultCode(response.getResult().getResultCode()); -======= queryResponse.setResultCode(response.getState().getReturnCode()); ->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 queryResponse.setQuery(request.getQuery()); return Response.status(Status.OK).entity(queryResponse).build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index baa8510..fa6b716 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -19,7 +19,10 @@ package org.apache.tajo.engine.query; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DeflateCodec; @@ -31,12 +34,12 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; @@ -57,7 +60,6 @@ import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestTablePartitions extends QueryTestCaseBase { @@ -865,13 +867,8 @@ public class TestTablePartitions extends QueryTestCaseBase { ClientProtos.SubmitQueryResponse response = client.executeQuery("insert overwrite into " + tableName + " select l_orderkey, l_partkey from lineitem"); -<<<<<<< HEAD - assertTrue(response.getResult().hasErrorMessage()); - assertEquals(response.getResult().getErrorMessage(), "INSERT has smaller expressions than target columns\n"); -======= assertTrue(ReturnStateUtil.isError(response.getState())); assertEquals(response.getState().getMessage(), "INSERT has smaller expressions than target columns"); ->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 res = executeFile("case14.sql"); assertResultSet(res, "case14.result"); @@ -896,13 +893,8 @@ public class TestTablePartitions extends QueryTestCaseBase { response = client.executeQuery("insert overwrite into " + tableName + " select l_returnflag , l_orderkey, l_partkey from lineitem"); -<<<<<<< HEAD - assertTrue(response.getResult().hasErrorMessage()); - assertEquals(response.getResult().getErrorMessage(), "INSERT has smaller expressions than target columns\n"); -======= assertTrue(ReturnStateUtil.isError(response.getState())); assertEquals(response.getState().getMessage(), "INSERT has smaller expressions than target columns"); ->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 res = executeFile("case15.sql"); assertResultSet(res, "case15.result"); http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java index c7ce211..644dfc7 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java @@ -20,9 +20,9 @@ package org.apache.tajo.jdbc; import com.google.common.collect.Lists; import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; -import org.apache.tajo.exception.SQLExceptionUtil; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.exception.SQLExceptionUtil; import org.apache.tajo.ipc.ClientProtos; import java.sql.*; @@ -158,19 +158,7 @@ public class TajoStatement implements Statement { } ClientProtos.SubmitQueryResponse response = tajoClient.executeQuery(sql); -<<<<<<< HEAD - if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { - if (response.getResult().hasErrorMessage()) { - throw new ServiceException(response.getResult().getErrorMessage()); - } - if (response.getResult().hasErrorTrace()) { - throw new ServiceException(response.getResult().getErrorTrace()); - } - throw new ServiceException("Failed to submit query by unknown reason"); - } -======= SQLExceptionUtil.throwIfError(response.getState()); ->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 QueryId queryId = new QueryId(response.getQueryId()); if (response.getIsForwarded() && !queryId.isNull()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6c01a2c3/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java index 61cadca..77a12e8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java @@ -34,16 +34,9 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.joinorder.*; import org.apache.tajo.plan.logical.*; -<<<<<<< HEAD -import org.apache.tajo.plan.rewrite.rules.AccessPathRewriter; -import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule; -import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; -import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule; -import org.apache.tajo.plan.rewrite.*; -======= import org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteEngine; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleProvider; ->>>>>>> c50a5dadff90fa90709abbce59856e834baa4867 import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.ReflectionUtil;
