http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java new file mode 100644 index 0000000..5a4e69f --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -0,0 +1,808 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import com.google.common.collect.Lists; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.handler.codec.http.*; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedWriteHandler; +import org.jboss.netty.util.CharsetUtil; + +import java.io.*; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; +import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; +import static org.jboss.netty.handler.codec.http.HttpMethod.GET; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class TajoPullServerService extends AbstractService { + + private static final Log LOG = LogFactory.getLog(TajoPullServerService.class); + + public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache"; + public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; + + public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes"; + public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; + + private int port; + private ChannelFactory selector; + private final ChannelGroup accepted = new DefaultChannelGroup(); + private HttpPipelineFactory pipelineFact; + private int sslFileBufferSize; + + private ApplicationId appId; + private FileSystem localFS; + + /** + * Should the shuffle use posix_fadvise calls to manage the OS cache during + * sendfile + */ + private boolean manageOsCache; + private int readaheadLength; + private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); + + + public static final String PULLSERVER_SERVICEID = "tajo.pullserver"; + + private static final Map<String,String> userRsrc = + new ConcurrentHashMap<String,String>(); + private String userName; + + public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = + "tajo.pullserver.ssl.file.buffer.size"; + + public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; + + private static boolean STANDALONE = false; + + static { + String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE"); + if (!StringUtils.isEmpty(standalone)) { + STANDALONE = standalone.equalsIgnoreCase("true"); + } + } + + @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") + static class ShuffleMetrics implements ChannelFutureListener { + @Metric({"OutputBytes","PullServer output in bytes"}) + MutableCounterLong shuffleOutputBytes; + @Metric({"Failed","# of failed shuffle outputs"}) + MutableCounterInt shuffleOutputsFailed; + @Metric({"Succeeded","# of succeeded shuffle outputs"}) + MutableCounterInt shuffleOutputsOK; + @Metric({"Connections","# of current shuffle connections"}) + MutableGaugeInt shuffleConnections; + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + shuffleOutputsOK.incr(); + } else { + shuffleOutputsFailed.incr(); + } + shuffleConnections.decr(); + } + } + + final ShuffleMetrics metrics; + + TajoPullServerService(MetricsSystem ms) { + super("httpshuffle"); + metrics = ms.register(new ShuffleMetrics()); + } + + @SuppressWarnings("UnusedDeclaration") + public TajoPullServerService() { + this(DefaultMetricsSystem.instance()); + } + + /** + * Serialize the shuffle port into a ByteBuffer for use later on. + * @param port the port to be sent to the ApplciationMaster + * @return the serialized form of the port. + */ + public static ByteBuffer serializeMetaData(int port) throws IOException { + //TODO these bytes should be versioned + DataOutputBuffer port_dob = new DataOutputBuffer(); + port_dob.writeInt(port); + return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); + } + + /** + * A helper function to deserialize the metadata returned by PullServerAuxService. + * @param meta the metadata returned by the PullServerAuxService + * @return the port the PullServer Handler is listening on to serve shuffle data. + */ + public static int deserializeMetaData(ByteBuffer meta) throws IOException { + //TODO this should be returning a class not just an int + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(meta); + return in.readInt(); + } + + public void initApp(String user, ApplicationId appId, ByteBuffer secret) { + // TODO these bytes should be versioned + // TODO: Once SHuffle is out of NM, this can use MR APIs + this.appId = appId; + this.userName = user; + userRsrc.put(appId.toString(), user); + } + + public void stopApp(ApplicationId appId) { + userRsrc.remove(appId.toString()); + } + + @Override + public void init(Configuration conf) { + try { + manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, + DEFAULT_SHUFFLE_MANAGE_OS_CACHE); + + readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, + DEFAULT_SHUFFLE_READAHEAD_BYTES); + + int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num", + Runtime.getRuntime().availableProcessors() * 2); + + selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum); + + localFS = new LocalFileSystem(); + + conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname + , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal); + super.init(conf); + LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength); + } catch (Throwable t) { + LOG.error(t); + } + } + + // TODO change AbstractService to throw InterruptedException + @Override + public synchronized void serviceInit(Configuration conf) throws Exception { + ServerBootstrap bootstrap = new ServerBootstrap(selector); + + try { + pipelineFact = new HttpPipelineFactory(conf); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + bootstrap.setPipelineFactory(pipelineFact); + + port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, + ConfVars.PULLSERVER_PORT.defaultIntVal); + Channel ch = bootstrap.bind(new InetSocketAddress(port)); + + accepted.add(ch); + port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); + conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); + pipelineFact.PullServer.setPort(port); + LOG.info(getName() + " listening on port " + port); + + sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, + DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); + + + if (STANDALONE) { + File pullServerPortFile = getPullServerPortFile(); + if (pullServerPortFile.exists()) { + pullServerPortFile.delete(); + } + pullServerPortFile.getParentFile().mkdirs(); + LOG.info("Write PullServerPort to " + pullServerPortFile); + FileOutputStream out = null; + try { + out = new FileOutputStream(pullServerPortFile); + out.write(("" + port).getBytes()); + } catch (Exception e) { + LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile + + ", " + e.getMessage(), e); + System.exit(-1); + } finally { + IOUtils.closeStream(out); + } + } + super.serviceInit(conf); + LOG.info("TajoPullServerService started: port=" + port); + } + + public static boolean isStandalone() { + return STANDALONE; + } + + private static File getPullServerPortFile() { + String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR"); + if (StringUtils.isEmpty(pullServerPortInfoFile)) { + pullServerPortInfoFile = "/tmp"; + } + return new File(pullServerPortInfoFile + "/pullserver.port"); + } + + // TODO change to get port from master or tajoConf + public static int readPullServerPort() { + FileInputStream in = null; + try { + File pullServerPortFile = getPullServerPortFile(); + + if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) { + return -1; + } + in = new FileInputStream(pullServerPortFile); + byte[] buf = new byte[1024]; + int readBytes = in.read(buf); + return Integer.parseInt(new String(buf, 0, readBytes)); + } catch (IOException e) { + LOG.fatal(e.getMessage(), e); + return -1; + } finally { + IOUtils.closeStream(in); + } + } + + public int getPort() { + return port; + } + + @Override + public synchronized void stop() { + try { + accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); + ServerBootstrap bootstrap = new ServerBootstrap(selector); + bootstrap.releaseExternalResources(); + pipelineFact.destroy(); + + localFS.close(); + } catch (Throwable t) { + LOG.error(t); + } finally { + super.stop(); + } + } + + public synchronized ByteBuffer getMeta() { + try { + return serializeMetaData(port); + } catch (IOException e) { + LOG.error("Error during getMeta", e); + // TODO add API to AuxiliaryServices to report failures + return null; + } + } + + class HttpPipelineFactory implements ChannelPipelineFactory { + + final PullServer PullServer; + private SSLFactory sslFactory; + + public HttpPipelineFactory(Configuration conf) throws Exception { + PullServer = new PullServer(conf); + if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, + ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) { + sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); + sslFactory.init(); + } + } + + public void destroy() { + if (sslFactory != null) { + sslFactory.destroy(); + } + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + if (sslFactory != null) { + pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); + } + + int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname, + ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal); + pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize)); + pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("chunking", new ChunkedWriteHandler()); + pipeline.addLast("shuffle", PullServer); + return pipeline; + // TODO factor security manager into pipeline + // TODO factor out encode/decode to permit binary shuffle + // TODO factor out decode of index to permit alt. models + } + } + + + Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>(); + + public void completeFileChunk(FileRegion filePart, + String requestUri, + long startTime) { + ProcessingStatus status = processingStatusMap.get(requestUri); + if (status != null) { + status.decrementRemainFiles(filePart, startTime); + } + } + + class ProcessingStatus { + String requestUri; + int numFiles; + AtomicInteger remainFiles; + long startTime; + long makeFileListTime; + long minTime = Long.MAX_VALUE; + long maxTime; + int numSlowFile; + + public ProcessingStatus(String requestUri) { + this.requestUri = requestUri; + this.startTime = System.currentTimeMillis(); + } + + public void setNumFiles(int numFiles) { + this.numFiles = numFiles; + this.remainFiles = new AtomicInteger(numFiles); + } + public void decrementRemainFiles(FileRegion filePart, long fileStartTime) { + synchronized(remainFiles) { + long fileSendTime = System.currentTimeMillis() - fileStartTime; + if (fileSendTime > 20 * 1000) { + LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount()); + numSlowFile++; + } + if (fileSendTime > maxTime) { + maxTime = fileSendTime; + } + if (fileSendTime < minTime) { + minTime = fileSendTime; + } + int remain = remainFiles.decrementAndGet(); + if (remain <= 0) { + processingStatusMap.remove(requestUri); + LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " + + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " + + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile); + } + } + } + } + + class PullServer extends SimpleChannelUpstreamHandler { + + private final Configuration conf; +// private final IndexCache indexCache; + private final LocalDirAllocator lDirAlloc = + new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + private int port; + + public PullServer(Configuration conf) throws IOException { + this.conf = conf; +// indexCache = new IndexCache(new JobConf(conf)); + this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, + ConfVars.PULLSERVER_PORT.defaultIntVal); + + // init local temporal dir + lDirAlloc.getAllLocalPathsToRead(".", conf); + } + + public void setPort(int port) { + this.port = port; + } + + private List<String> splitMaps(List<String> mapq) { + if (null == mapq) { + return null; + } + final List<String> ret = new ArrayList<String>(); + for (String s : mapq) { + Collections.addAll(ret, s.split(",")); + } + return ret; + } + + @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) + throws Exception { + + accepted.add(evt.getChannel()); + LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size())); + super.channelOpen(ctx, evt); + + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + + HttpRequest request = (HttpRequest) e.getMessage(); + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + + ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString()); + processingStatusMap.put(request.getUri().toString(), processingStatus); + // Parsing the URL into key-values + final Map<String, List<String>> params = + new QueryStringDecoder(request.getUri()).getParameters(); + final List<String> types = params.get("type"); + final List<String> qids = params.get("qid"); + final List<String> taskIdList = params.get("ta"); + final List<String> subQueryIds = params.get("sid"); + final List<String> partIds = params.get("p"); + final List<String> offsetList = params.get("offset"); + final List<String> lengthList = params.get("length"); + + if (types == null || subQueryIds == null || qids == null || partIds == null) { + sendError(ctx, "Required queryId, type, subquery Id, and part id", + BAD_REQUEST); + return; + } + + if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) { + sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", + BAD_REQUEST); + return; + } + + String partId = partIds.get(0); + String queryId = qids.get(0); + String shuffleType = types.get(0); + String sid = subQueryIds.get(0); + + long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; + long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; + + if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) { + sendError(ctx, "Required taskIds", BAD_REQUEST); + } + + List<String> taskIds = splitMaps(taskIdList); + + String queryBaseDir = queryId.toString() + "/output"; + + if (LOG.isDebugEnabled()) { + LOG.debug("PullServer request param: shuffleType=" + shuffleType + + ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList); + + // the working dir of tajo worker for each query + LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir); + } + + final List<FileChunk> chunks = Lists.newArrayList(); + + // if a subquery requires a range shuffle + if (shuffleType.equals("r")) { + String ta = taskIds.get(0); + if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){ + LOG.warn(e); + sendError(ctx, NO_CONTENT); + return; + } + Path path = localFS.makeQualified( + lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)); + String startKey = params.get("start").get(0); + String endKey = params.get("end").get(0); + boolean last = params.get("final") != null; + + FileChunk chunk; + try { + chunk = getFileCunks(path, startKey, endKey, last); + } catch (Throwable t) { + LOG.error("ERROR Request: " + request.getUri(), t); + sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST); + return; + } + if (chunk != null) { + chunks.add(chunk); + } + + // if a subquery requires a hash shuffle or a scattered hash shuffle + } else if (shuffleType.equals("h") || shuffleType.equals("s")) { + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); + String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; + if (!lDirAlloc.ifExists(partPath, conf)) { + LOG.warn("Partition shuffle file not exists: " + partPath); + sendError(ctx, NO_CONTENT); + return; + } + + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf)); + + File file = new File(path.toUri()); + long startPos = (offset >= 0 && length >= 0) ? offset : 0; + long readLen = (offset >= 0 && length >= 0) ? length : file.length(); + + if (startPos >= file.length()) { + String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]"; + LOG.error(errorMessage); + sendError(ctx, errorMessage, BAD_REQUEST); + return; + } + LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length()); + FileChunk chunk = new FileChunk(file, startPos, readLen); + chunks.add(chunk); + } else { + LOG.error("Unknown shuffle type: " + shuffleType); + sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST); + return; + } + + processingStatus.setNumFiles(chunks.size()); + processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime; + // Write the content. + Channel ch = e.getChannel(); + if (chunks.size() == 0) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); + ch.write(response); + if (!isKeepAlive(request)) { + ch.close(); + } + } else { + FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + long totalSize = 0; + for (FileChunk chunk : file) { + totalSize += chunk.length(); + } + setContentLength(response, totalSize); + + // Write the initial line and the header. + ch.write(response); + + ChannelFuture writeFuture = null; + + for (FileChunk chunk : file) { + writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString()); + if (writeFuture == null) { + sendError(ctx, NOT_FOUND); + return; + } + } + + // Decide whether to close the connection or not. + if (!isKeepAlive(request)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } + } + } + + private ChannelFuture sendFile(ChannelHandlerContext ctx, + Channel ch, + FileChunk file, + String requestUri) throws IOException { + long startTime = System.currentTimeMillis(); + RandomAccessFile spill = null; + ChannelFuture writeFuture; + try { + spill = new RandomAccessFile(file.getFile(), "r"); + if (ch.getPipeline().get(SslHandler.class) == null) { + final FadvisedFileRegion filePart = new FadvisedFileRegion(spill, + file.startOffset(), file.length(), manageOsCache, readaheadLength, + readaheadPool, file.getFile().getAbsolutePath()); + writeFuture = ch.write(filePart); + writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this)); + } else { + // HTTPS cannot be done with zero copy. + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + file.startOffset(), file.length(), sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + file.getFile().getAbsolutePath()); + writeFuture = ch.write(chunk); + } + } catch (FileNotFoundException e) { + LOG.info(file.getFile() + " not found"); + return null; + } catch (Throwable e) { + if (spill != null) { + //should close a opening file + spill.close(); + } + return null; + } + metrics.shuffleConnections.incr(); + metrics.shuffleOutputBytes.incr(file.length()); // optimistic + return writeFuture; + } + + private void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + sendError(ctx, "", status); + } + + private void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent( + ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + LOG.error(e.getCause().getMessage(), e.getCause()); + //if channel.close() is not called, never closed files in this request + if (ctx.getChannel().isConnected()){ + ctx.getChannel().close(); + } + } + } + + public static FileChunk getFileCunks(Path outDir, + String startKey, + String endKey, + boolean last) throws IOException { + BSTIndex index = new BSTIndex(new TajoConf()); + BSTIndex.BSTIndexReader idxReader = + index.getIndexReader(new Path(outDir, "index")); + idxReader.open(); + Schema keySchema = idxReader.getKeySchema(); + TupleComparator comparator = idxReader.getComparator(); + + LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + + idxReader.getLastKey()); + + File data = new File(URI.create(outDir.toUri() + "/output")); + byte [] startBytes = Base64.decodeBase64(startKey); + byte [] endBytes = Base64.decodeBase64(endKey); + + RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); + Tuple start; + Tuple end; + try { + start = decoder.toTuple(startBytes); + } catch (Throwable t) { + throw new IllegalArgumentException("StartKey: " + startKey + + ", decoded byte size: " + startBytes.length, t); + } + + try { + end = decoder.toTuple(endBytes); + } catch (Throwable t) { + throw new IllegalArgumentException("EndKey: " + endKey + + ", decoded byte size: " + endBytes.length, t); + } + + LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end + + (last ? ", last=true" : "") + ")"); + + if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero + LOG.info("There is no contents"); + return null; + } + + if (comparator.compare(end, idxReader.getFirstKey()) < 0 || + comparator.compare(idxReader.getLastKey(), start) < 0) { + LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + + "], but request start:" + start + ", end: " + end); + return null; + } + + long startOffset; + long endOffset; + try { + startOffset = idxReader.find(start); + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + try { + endOffset = idxReader.find(end); + if (endOffset == -1) { + endOffset = idxReader.find(end, true); + } + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + + // if startOffset == -1 then case 2-1 or case 3 + if (startOffset == -1) { // this is a hack + // if case 2-1 or case 3 + try { + startOffset = idxReader.find(start, true); + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + } + + if (startOffset == -1) { + throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + + "State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + } + + // if greater than indexed values + if (last || (endOffset == -1 + && comparator.compare(idxReader.getLastKey(), end) < 0)) { + endOffset = data.length(); + } + + idxReader.close(); + + FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); + LOG.info("Retrieve File Chunk: " + chunk); + return chunk; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java new file mode 100644 index 0000000..67e7423 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver.retriever; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.QueryUnitId; +import org.apache.tajo.pullserver.FileAccessForbiddenException; +import org.apache.tajo.util.TajoIdUtils; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.QueryStringDecoder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class AdvancedDataRetriever implements DataRetriever { + private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class); + private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap(); + + public AdvancedDataRetriever() { + } + + public void register(String taskAttemptId, RetrieverHandler handler) { + synchronized (handlerMap) { + if (!handlerMap.containsKey(taskAttemptId)) { + handlerMap.put(taskAttemptId, handler); + } + } + } + + public void unregister(String taskAttemptId) { + synchronized (handlerMap) { + if (handlerMap.containsKey(taskAttemptId)) { + handlerMap.remove(taskAttemptId); + } + } + } + + @Override + public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) + throws IOException { + + final Map<String, List<String>> params = + new QueryStringDecoder(request.getUri()).getParameters(); + + if (!params.containsKey("qid")) { + throw new FileNotFoundException("No such qid: " + params.containsKey("qid")); + } + + if (params.containsKey("sid")) { + List<FileChunk> chunks = Lists.newArrayList(); + List<String> queryUnidIds = splitMaps(params.get("qid")); + for (String eachQueryUnitId : queryUnidIds) { + String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_"); + ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0)); + QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0])); + + QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1])); + + RetrieverHandler handler = handlerMap.get(attemptId.toString()); + FileChunk chunk = handler.get(params); + chunks.add(chunk); + } + return chunks.toArray(new FileChunk[chunks.size()]); + } else { + RetrieverHandler handler = handlerMap.get(params.get("qid").get(0)); + FileChunk chunk = handler.get(params); + if (chunk == null) { + if (params.containsKey("qid")) { // if there is no content corresponding to the query + return null; + } else { // if there is no + throw new FileNotFoundException("No such a file corresponding to " + params.get("qid")); + } + } + + File file = chunk.getFile(); + if (file.isHidden() || !file.exists()) { + throw new FileNotFoundException("No such file: " + file.getAbsolutePath()); + } + if (!file.isFile()) { + throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file"); + } + + return new FileChunk[] {chunk}; + } + } + + private List<String> splitMaps(List<String> qids) { + if (null == qids) { + LOG.error("QueryUnitId is EMPTY"); + return null; + } + + final List<String> ret = new ArrayList<String>(); + for (String qid : qids) { + Collections.addAll(ret, qid.split(",")); + } + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java new file mode 100644 index 0000000..8f55f7b --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver.retriever; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.HttpRequest; + +import java.io.IOException; + +public interface DataRetriever { + FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java new file mode 100644 index 0000000..dc63929 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver.retriever; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.apache.tajo.pullserver.FileAccessForbiddenException; +import org.apache.tajo.pullserver.HttpDataServerHandler; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +public class DirectoryRetriever implements DataRetriever { + public String baseDir; + + public DirectoryRetriever(String baseDir) { + this.baseDir = baseDir; + } + + @Override + public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) + throws IOException { + final String path = HttpDataServerHandler.sanitizeUri(request.getUri()); + if (path == null) { + throw new IllegalArgumentException("Wrong path: " +path); + } + + File file = new File(baseDir, path); + if (file.isHidden() || !file.exists()) { + throw new FileNotFoundException("No such file: " + baseDir + "/" + path); + } + if (!file.isFile()) { + throw new FileAccessForbiddenException("No such file: " + + baseDir + "/" + path); + } + + return new FileChunk[] {new FileChunk(file, 0, file.length())}; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java new file mode 100644 index 0000000..67cff21 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver.retriever; + +import java.io.File; +import java.io.FileNotFoundException; + +public class FileChunk { + private final File file; + private final long startOffset; + private long length; + + /** + * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise. + */ + private boolean fromRemote; + + /** + * ExecutionBlockId + */ + private String ebId; + + public FileChunk(File file, long startOffset, long length) throws FileNotFoundException { + this.file = file; + this.startOffset = startOffset; + this.length = length; + } + + public File getFile() { + return this.file; + } + + public long startOffset() { + return this.startOffset; + } + + public long length() { + return this.length; + } + + public void setLength(long newLength) { + this.length = newLength; + } + + public boolean fromRemote() { + return this.fromRemote; + } + + public void setFromRemote(boolean newVal) { + this.fromRemote = newVal; + } + + public String getEbId() { + return this.ebId; + } + + public void setEbId(String newVal) { + this.ebId = newVal; + } + + public String toString() { + return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") " + + file.getAbsolutePath(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java new file mode 100644 index 0000000..5567c0d --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver.retriever; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public interface RetrieverHandler { + /** + * + * @param kvs url-decoded key/value pairs + * @return a desired part of a file + * @throws java.io.IOException + */ + public FileChunk get(Map<String, List<String>> kvs) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/pom.xml b/tajo-yarn-pullserver/pom.xml deleted file mode 100644 index a7644a1..0000000 --- a/tajo-yarn-pullserver/pom.xml +++ /dev/null @@ -1,146 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>tajo-project</artifactId> - <groupId>org.apache.tajo</groupId> - <version>0.9.1-SNAPSHOT</version> - <relativePath>../tajo-project</relativePath> - </parent> - <modelVersion>4.0.0</modelVersion> - <name>Tajo Core PullServer</name> - <artifactId>tajo-yarn-pullserver</artifactId> - - <build> - <plugins> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <executions> - <execution> - <phase>verify</phase> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> - </dependency> - <dependency> - <groupId>org.apache.tajo</groupId> - <artifactId>tajo-catalog-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-nodemanager</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-shuffle</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - - <profiles> - <profile> - <id>docs</id> - <activation> - <activeByDefault>false</activeByDefault> - </activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <execution> - <!-- build javadoc jars per jar for publishing to maven --> - <id>module-javadocs</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <destDir>${project.build.directory}</destDir> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> - - <reporting> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-project-info-reports-plugin</artifactId> - <version>2.4</version> - <configuration> - <dependencyLocationsEnabled>false</dependencyLocationsEnabled> - </configuration> - </plugin> - </plugins> - </reporting> - -</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java deleted file mode 100644 index b0b8d18..0000000 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.ReadaheadPool; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.jboss.netty.handler.stream.ChunkedFile; - -import java.io.FileDescriptor; -import java.io.IOException; -import java.io.RandomAccessFile; - -public class FadvisedChunkedFile extends ChunkedFile { - - private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class); - - private final boolean manageOsCache; - private final int readaheadLength; - private final ReadaheadPool readaheadPool; - private final FileDescriptor fd; - private final String identifier; - - private ReadaheadPool.ReadaheadRequest readaheadRequest; - - public FadvisedChunkedFile(RandomAccessFile file, long position, long count, - int chunkSize, boolean manageOsCache, int readaheadLength, - ReadaheadPool readaheadPool, String identifier) throws IOException { - super(file, position, count, chunkSize); - this.manageOsCache = manageOsCache; - this.readaheadLength = readaheadLength; - this.readaheadPool = readaheadPool; - this.fd = file.getFD(); - this.identifier = identifier; - } - - @Override - public Object nextChunk() throws Exception { - if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { - readaheadRequest = readaheadPool - .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, - getEndOffset(), readaheadRequest); - } - return super.nextChunk(); - } - - @Override - public void close() throws Exception { - if (readaheadRequest != null) { - readaheadRequest.cancel(); - } - if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) { - try { - PullServerUtil.posixFadviseIfPossible(identifier, - fd, - getStartOffset(), getEndOffset() - getStartOffset(), - NativeIO.POSIX.POSIX_FADV_DONTNEED); - } catch (Throwable t) { - LOG.warn("Failed to manage OS cache for " + identifier, t); - } - } - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java deleted file mode 100644 index 18cf4b6..0000000 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.ReadaheadPool; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.jboss.netty.channel.DefaultFileRegion; - -import java.io.FileDescriptor; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.WritableByteChannel; - -public class FadvisedFileRegion extends DefaultFileRegion { - - private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class); - - private final boolean manageOsCache; - private final int readaheadLength; - private final ReadaheadPool readaheadPool; - private final FileDescriptor fd; - private final String identifier; - private final long count; - private final long position; - private final int shuffleBufferSize; - private final boolean shuffleTransferToAllowed; - private final FileChannel fileChannel; - - private ReadaheadPool.ReadaheadRequest readaheadRequest; - public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024; - - public FadvisedFileRegion(RandomAccessFile file, long position, long count, - boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier) throws IOException { - this(file, position, count, manageOsCache, readaheadLength, readaheadPool, - identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true); - } - - public FadvisedFileRegion(RandomAccessFile file, long position, long count, - boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, - String identifier, int shuffleBufferSize, - boolean shuffleTransferToAllowed) throws IOException { - super(file.getChannel(), position, count); - this.manageOsCache = manageOsCache; - this.readaheadLength = readaheadLength; - this.readaheadPool = readaheadPool; - this.fd = file.getFD(); - this.identifier = identifier; - this.fileChannel = file.getChannel(); - this.count = count; - this.position = position; - this.shuffleBufferSize = shuffleBufferSize; - this.shuffleTransferToAllowed = shuffleTransferToAllowed; - } - - @Override - public long transferTo(WritableByteChannel target, long position) - throws IOException { - if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { - readaheadRequest = readaheadPool.readaheadStream(identifier, fd, - getPosition() + position, readaheadLength, - getPosition() + getCount(), readaheadRequest); - } - - if(this.shuffleTransferToAllowed) { - return super.transferTo(target, position); - } else { - return customShuffleTransfer(target, position); - } - } - - /** - * This method transfers data using local buffer. It transfers data from - * a disk to a local buffer in memory, and then it transfers data from the - * buffer to the target. This is used only if transferTo is disallowed in - * the configuration file. super.TransferTo does not perform well on Windows - * due to a small IO request generated. customShuffleTransfer can control - * the size of the IO requests by changing the size of the intermediate - * buffer. - */ - @VisibleForTesting - long customShuffleTransfer(WritableByteChannel target, long position) - throws IOException { - long actualCount = this.count - position; - if (actualCount < 0 || position < 0) { - throw new IllegalArgumentException( - "position out of range: " + position + - " (expected: 0 - " + (this.count - 1) + ')'); - } - if (actualCount == 0) { - return 0L; - } - - long trans = actualCount; - int readSize; - ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize); - - while(trans > 0L && - (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { - //adjust counters and buffer limit - if(readSize < trans) { - trans -= readSize; - position += readSize; - byteBuffer.flip(); - } else { - //We can read more than we need if the actualCount is not multiple - //of the byteBuffer size and file is big enough. In that case we cannot - //use flip method but we need to set buffer limit manually to trans. - byteBuffer.limit((int)trans); - byteBuffer.position(0); - position += trans; - trans = 0; - } - - //write data to the target - while(byteBuffer.hasRemaining()) { - target.write(byteBuffer); - } - - byteBuffer.clear(); - } - - return actualCount - trans; - } - - - @Override - public void releaseExternalResources() { - if (readaheadRequest != null) { - readaheadRequest.cancel(); - } - super.releaseExternalResources(); - } - - /** - * Call when the transfer completes successfully so we can advise the OS that - * we don't need the region to be cached anymore. - */ - public void transferSuccessful() { - if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) { - try { - PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(), - NativeIO.POSIX.POSIX_FADV_DONTNEED); - } catch (Throwable t) { - LOG.warn("Failed to manage OS cache for " + identifier, t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java deleted file mode 100644 index c703f6f..0000000 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import java.io.IOException; - -public class FileAccessForbiddenException extends IOException { - private static final long serialVersionUID = -3383272565826389213L; - - public FileAccessForbiddenException() { - } - - public FileAccessForbiddenException(String message) { - super(message); - } - - public FileAccessForbiddenException(Throwable cause) { - super(cause); - } - - public FileAccessForbiddenException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java deleted file mode 100644 index 236db89..0000000 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; - -public class FileCloseListener implements ChannelFutureListener { - - private FadvisedFileRegion filePart; - private String requestUri; - private TajoPullServerService pullServerService; - private long startTime; - - public FileCloseListener(FadvisedFileRegion filePart, - String requestUri, - long startTime, - TajoPullServerService pullServerService) { - this.filePart = filePart; - this.requestUri = requestUri; - this.pullServerService = pullServerService; - this.startTime = startTime; - } - - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output - @Override - public void operationComplete(ChannelFuture future) { - if(future.isSuccess()){ - filePart.transferSuccessful(); - } - filePart.releaseExternalResources(); - if (pullServerService != null) { - pullServerService.completeFileChunk(filePart, requestUri, startTime); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java deleted file mode 100644 index 31db15c..0000000 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.pullserver.retriever.DataRetriever; -import org.apache.tajo.pullserver.retriever.FileChunk; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.codec.http.*; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; -import org.jboss.netty.util.CharsetUtil; - -import java.io.*; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { - private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class); - - Map<ExecutionBlockId, DataRetriever> retrievers = - new ConcurrentHashMap<ExecutionBlockId, DataRetriever>(); - private String userName; - private String appId; - - public HttpDataServerHandler(String userName, String appId) { - this.userName= userName; - this.appId = appId; - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); - return; - } - - String base = - ContainerLocalizer.USERCACHE + "/" + userName + "/" - + ContainerLocalizer.APPCACHE + "/" - + appId + "/output" + "/"; - - final Map<String, List<String>> params = - new QueryStringDecoder(request.getUri()).getParameters(); - - List<FileChunk> chunks = Lists.newArrayList(); - List<String> taskIds = splitMaps(params.get("ta")); - int sid = Integer.valueOf(params.get("sid").get(0)); - int partitionId = Integer.valueOf(params.get("p").get(0)); - for (String ta : taskIds) { - - File file = new File(base + "/" + sid + "/" + ta + "/output/" + partitionId); - FileChunk chunk = new FileChunk(file, 0, file.length()); - chunks.add(chunk); - } - - FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); -// try { -// file = retriever.handle(ctx, request); -// } catch (FileNotFoundException fnf) { -// LOG.error(fnf); -// sendError(ctx, NOT_FOUND); -// return; -// } catch (IllegalArgumentException iae) { -// LOG.error(iae); -// sendError(ctx, BAD_REQUEST); -// return; -// } catch (FileAccessForbiddenException fafe) { -// LOG.error(fafe); -// sendError(ctx, FORBIDDEN); -// return; -// } catch (IOException ioe) { -// LOG.error(ioe); -// sendError(ctx, INTERNAL_SERVER_ERROR); -// return; -// } - - // Write the content. - Channel ch = e.getChannel(); - if (file == null) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); - ch.write(response); - if (!isKeepAlive(request)) { - ch.close(); - } - } else { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - long totalSize = 0; - for (FileChunk chunk : file) { - totalSize += chunk.length(); - } - setContentLength(response, totalSize); - - // Write the initial line and the header. - ch.write(response); - - ChannelFuture writeFuture = null; - - for (FileChunk chunk : file) { - writeFuture = sendFile(ctx, ch, chunk); - if (writeFuture == null) { - sendError(ctx, NOT_FOUND); - return; - } - } - - // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { - // Close the connection when the whole content is written out. - writeFuture.addListener(ChannelFutureListener.CLOSE); - } - } - } - - private ChannelFuture sendFile(ChannelHandlerContext ctx, - Channel ch, - FileChunk file) throws IOException { - RandomAccessFile raf; - try { - raf = new RandomAccessFile(file.getFile(), "r"); - } catch (FileNotFoundException fnfe) { - return null; - } - - ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) != null) { - // Cannot use zero-copy with HTTPS. - writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), - file.length(), 8192)); - } else { - // No encryption - use zero-copy. - final FileRegion region = new DefaultFileRegion(raf.getChannel(), - file.startOffset(), file.length()); - writeFuture = ch.write(region); - writeFuture.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) { - region.releaseExternalResources(); - } - }); - } - - return writeFuture; - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); - if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); - return; - } - - cause.printStackTrace(); - if (ch.isConnected()) { - sendError(ctx, INTERNAL_SERVER_ERROR); - } - } - - public static String sanitizeUri(String uri) { - // Decode the path. - try { - uri = URLDecoder.decode(uri, "UTF-8"); - } catch (UnsupportedEncodingException e) { - try { - uri = URLDecoder.decode(uri, "ISO-8859-1"); - } catch (UnsupportedEncodingException e1) { - throw new Error(); - } - } - - // Convert file separators. - uri = uri.replace('/', File.separatorChar); - - // Simplistic dumb security check. - // You will have to do something serious in the production environment. - if (uri.contains(File.separator + ".") - || uri.contains("." + File.separator) || uri.startsWith(".") - || uri.endsWith(".")) { - return null; - } - - // Convert to absolute path. - return uri; - } - - private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent(ChannelBuffers.copiedBuffer( - "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); - - // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); - } - - private List<String> splitMaps(List<String> qids) { - if (null == qids) { - LOG.error("QueryUnitId is EMPTY"); - return null; - } - - final List<String> ret = new ArrayList<String>(); - for (String qid : qids) { - Collections.addAll(ret, qid.split(",")); - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java deleted file mode 100644 index 4c8bd8b..0000000 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.handler.codec.http.HttpContentCompressor; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; - -import static org.jboss.netty.channel.Channels.pipeline; - -public class HttpDataServerPipelineFactory implements ChannelPipelineFactory { - private String userName; - private String appId; - public HttpDataServerPipelineFactory(String userName, String appId) { - this.userName = userName; - this.appId = appId; - } - - public ChannelPipeline getPipeline() throws Exception { - // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); - - // Uncomment the following line if you want HTTPS - // SSLEngine engine = - // SecureChatSslContextFactory.getServerContext().createSSLEngine(); - // engine.setUseClientMode(false); - // pipeline.addLast("ssl", new SslHandler(engine)); - - pipeline.addLast("decoder", new HttpRequestDecoder()); - //pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); - pipeline.addLast("deflater", new HttpContentCompressor()); - pipeline.addLast("handler", new HttpDataServerHandler(userName, appId)); - return pipeline; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java deleted file mode 100644 index 2cbb101..0000000 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import com.google.common.collect.Maps; - -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URLEncoder; -import java.util.Map; - -public class HttpUtil { - public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException { - return getParamsFromQuery(uri.getQuery()); - } - - /** - * It parses a query string into key/value pairs - * - * @param queryString decoded query string - * @return key/value pairs parsed from a given query string - * @throws java.io.UnsupportedEncodingException - */ - public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException { - String [] queries = queryString.split("&"); - - Map<String,String> params = Maps.newHashMap(); - String [] param; - for (String q : queries) { - param = q.split("="); - params.put(param[0], param[1]); - } - - return params; - } - - public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException { - StringBuilder sb = new StringBuilder(); - - boolean first = true; - for (Map.Entry<String,String> param : params.entrySet()) { - if (!first) { - sb.append("&"); - } - sb.append(URLEncoder.encode(param.getKey(), "UTF-8")). - append("="). - append(URLEncoder.encode(param.getValue(), "UTF-8")); - first = false; - } - - return sb.toString(); - } -}
