Repository: tajo Updated Branches: refs/heads/index_support ad78d2f62 -> 35453bbda
TAJO-1295: Remove legacy worker.dataserver package and its unit tests. Closes #345 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bc478ba8 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bc478ba8 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bc478ba8 Branch: refs/heads/index_support Commit: bc478ba834e9bba768155faa53f918e495a74671 Parents: c45d0ef Author: Hyunsik Choi <[email protected]> Authored: Sun Jan 11 02:04:29 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Sun Jan 11 02:04:29 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/worker/InterDataRetriever.java | 113 ------ .../tajo/worker/PartitionRetrieverHandler.java | 44 --- .../tajo/worker/RangeRetrieverHandler.java | 163 -------- .../FileAccessForbiddenException.java | 40 -- .../tajo/worker/dataserver/HttpDataServer.java | 87 ----- .../dataserver/HttpDataServerHandler.java | 199 ---------- .../HttpDataServerPipelineFactory.java | 55 --- .../apache/tajo/worker/dataserver/HttpUtil.java | 69 ---- .../retriever/AdvancedDataRetriever.java | 128 ------- .../dataserver/retriever/DataRetriever.java | 29 -- .../retriever/DirectoryRetriever.java | 56 --- .../worker/dataserver/retriever/FileChunk.java | 51 --- .../dataserver/retriever/RetrieverHandler.java | 33 -- .../planner/physical/TestPhysicalPlanner.java | 103 +---- .../tajo/worker/TestRangeRetrieverHandler.java | 381 ------------------- .../worker/dataserver/TestHttpDataServer.java | 172 --------- .../tajo/worker/dataserver/TestHttpUtil.java | 49 --- 18 files changed, 4 insertions(+), 1771 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 5f427a9..5ff3143 100644 --- a/CHANGES +++ b/CHANGES @@ -284,6 +284,9 @@ Release 0.9.1 - unreleased TASKS + TAJO-1295: Remove legacy worker.dataserver package and its unit tests. + (hyunsik) + TAJO-1296: Remove obsolete classes from tajo.master.container package. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java deleted file mode 100644 index 5b2ad0f..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.worker; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.apache.tajo.TaskId; -import org.apache.tajo.worker.dataserver.FileAccessForbiddenException; -import org.apache.tajo.worker.dataserver.retriever.DataRetriever; -import org.apache.tajo.worker.dataserver.retriever.FileChunk; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Map; -import java.util.Set; - -@Deprecated -public class InterDataRetriever implements DataRetriever { - private final Log LOG = LogFactory.getLog(InterDataRetriever.class); - private final Set<TaskId> registered = Sets.newHashSet(); - private final Map<String, String> map = Maps.newConcurrentMap(); - - public InterDataRetriever() { - } - - public void register(TaskId id, String baseURI) { - synchronized (registered) { - if (!registered.contains(id)) { - map.put(id.toString(), baseURI); - registered.add(id); - } - } - } - - public void unregister(TaskId id) { - synchronized (registered) { - if (registered.contains(id)) { - map.remove(id.toString()); - registered.remove(id); - } - } - } - - /* (non-Javadoc) - * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest) - */ - @Override - public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) - throws IOException { - - int start = request.getUri().indexOf('?'); - if (start < 0) { - throw new IllegalArgumentException("Wrong request: " + request.getUri()); - } - - String queryStr = request.getUri().substring(start + 1); - LOG.info("QUERY: " + queryStr); - String [] queries = queryStr.split("&"); - - String qid = null; - String fn = null; - String [] kv; - for (String query : queries) { - kv = query.split("="); - if (kv[0].equals("qid")) { - qid = kv[1]; - } else if (kv[0].equals("fn")) { - fn = kv[1]; - } - } - - String baseDir = map.get(qid); - if (baseDir == null) { - throw new FileNotFoundException("No such qid: " + qid); - } - - File file = new File(baseDir + "/" + fn); - if (file.isHidden() || !file.exists()) { - throw new FileNotFoundException("No such file: " + baseDir + "/" - + file.getName()); - } - if (!file.isFile()) { - throw new FileAccessForbiddenException("No such file: " - + baseDir + "/" + file.getName()); - } - - return new FileChunk[] {new FileChunk(file, 0, file.length())}; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java deleted file mode 100644 index 36e7353..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import org.apache.tajo.worker.dataserver.retriever.FileChunk; -import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -public class PartitionRetrieverHandler implements RetrieverHandler { - private final String baseDir; - - public PartitionRetrieverHandler(String baseDir) { - this.baseDir = baseDir; - } - - @Override - public FileChunk get(Map<String, List<String>> kvs) throws IOException { - // nothing to verify the file because AdvancedDataRetriever checks - // its validity of the file. - File file = new File(baseDir + "/" + kvs.get("fn").get(0)); - - return new FileChunk(file, 0, file.length()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java deleted file mode 100644 index 2b58196..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.worker.dataserver.retriever.FileChunk; -import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * - * It retrieves the file chunk ranged between start and end keys. - * The start key is inclusive, but the end key is exclusive. - * - * Internally, there are four cases: - * <ul> - * <li>out of scope: the index range does not overlapped with the query range.</li> - * <li>overlapped: the index range is partially overlapped with the query range. </li> - * <li>included: the index range is included in the start and end keys</li> - * <li>covered: the index range covers the query range (i.e., start and end keys).</li> - * </ul> - */ -public class RangeRetrieverHandler implements RetrieverHandler { - private static final Log LOG = LogFactory.getLog(RangeRetrieverHandler.class); - private final File file; - private final BSTIndex.BSTIndexReader idxReader; - private final Schema schema; - private final BaseTupleComparator comp; - private final RowStoreDecoder decoder; - - public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator comp) throws IOException { - this.file = outDir; - BSTIndex index = new BSTIndex(new TajoConf()); - this.schema = schema; - this.comp = comp; - FileSystem fs = FileSystem.getLocal(new Configuration()); - Path indexPath = fs.makeQualified(new Path(outDir.getCanonicalPath(), "index")); - this.idxReader = - index.getIndexReader(indexPath, this.schema, this.comp); - this.idxReader.open(); - LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " - + idxReader.getLastKey()); - this.decoder = RowStoreUtil.createDecoder(schema); - } - - @Override - public FileChunk get(Map<String, List<String>> kvs) throws IOException { - // nothing to verify the file because AdvancedDataRetriever checks - // its validity of the file. - File data = new File(this.file, "data/data"); - byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0)); - Tuple start = decoder.toTuple(startBytes); - byte [] endBytes; - Tuple end; - endBytes = Base64.decodeBase64(kvs.get("end").get(0)); - end = decoder.toTuple(endBytes); - boolean last = kvs.containsKey("final"); - - if(!comp.isAscendingFirstKey()) { - Tuple tmpKey = start; - start = end; - end = tmpKey; - } - - LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end + - (last ? ", last=true" : "") + ")"); - - if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero - LOG.info("There is no contents"); - return null; - } - - if (comp.compare(end, idxReader.getFirstKey()) < 0 || - comp.compare(idxReader.getLastKey(), start) < 0) { - LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + - "], but request start:" + start + ", end: " + end); - return null; - } - - long startOffset; - long endOffset; - try { - startOffset = idxReader.find(start); - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - try { - endOffset = idxReader.find(end); - if (endOffset == -1) { - endOffset = idxReader.find(end, true); - } - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - - // if startOffset == -1 then case 2-1 or case 3 - if (startOffset == -1) { // this is a hack - // if case 2-1 or case 3 - try { - startOffset = idxReader.find(start, true); - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - } - - if (startOffset == -1) { - throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + - "State Dump (the requested range: " - + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - } - - // if greater than indexed values - if (last || (endOffset == -1 && comp.compare(idxReader.getLastKey(), end) < 0)) { - endOffset = data.length(); - } - - FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); - LOG.info("Retrieve File Chunk: " + chunk); - return chunk; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java deleted file mode 100644 index 6c93e4f..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver; - -import java.io.IOException; - -public class FileAccessForbiddenException extends IOException { - private static final long serialVersionUID = -3383272565826389213L; - - public FileAccessForbiddenException() { - } - - public FileAccessForbiddenException(String message) { - super(message); - } - - public FileAccessForbiddenException(Throwable cause) { - super(cause); - } - - public FileAccessForbiddenException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java deleted file mode 100644 index 523d6e1..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.net.NetUtils; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.ChannelGroupFuture; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.apache.tajo.worker.dataserver.retriever.DataRetriever; - -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; - -public class HttpDataServer { - private final static Log LOG = LogFactory.getLog(HttpDataServer.class); - - private final InetSocketAddress addr; - private InetSocketAddress bindAddr; - private ServerBootstrap bootstrap = null; - private ChannelFactory factory = null; - private ChannelGroup channelGroup = null; - - public HttpDataServer(final InetSocketAddress addr, - final DataRetriever retriever) { - this.addr = addr; - this.factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), - Runtime.getRuntime().availableProcessors() * 2); - - // Configure the server. - this.bootstrap = new ServerBootstrap(factory); - // Set up the event pipeline factory. - this.bootstrap.setPipelineFactory( - new HttpDataServerPipelineFactory(retriever)); - this.channelGroup = new DefaultChannelGroup(); - } - - public HttpDataServer(String bindaddr, DataRetriever retriever) { - this(NetUtils.createSocketAddr(bindaddr), retriever); - } - - public void start() { - // Bind and start to accept incoming connections. - Channel channel = bootstrap.bind(addr); - channelGroup.add(channel); - this.bindAddr = (InetSocketAddress) channel.getLocalAddress(); - LOG.info("HttpDataServer starts up (" - + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort() - + ")"); - } - - public InetSocketAddress getBindAddress() { - return this.bindAddr; - } - - public void stop() { - ChannelGroupFuture future = channelGroup.close(); - future.awaitUninterruptibly(); - factory.releaseExternalResources(); - - LOG.info("HttpDataServer shutdown (" - + this.bindAddr.getAddress().getHostAddress() + ":" - + this.bindAddr.getPort() + ")"); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java deleted file mode 100644 index 6b9eea8..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; -import org.jboss.netty.util.CharsetUtil; -import org.apache.tajo.worker.dataserver.retriever.DataRetriever; -import org.apache.tajo.worker.dataserver.retriever.FileChunk; - -import java.io.*; -import java.net.URLDecoder; - -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { - private final static Log LOG = LogFactory.getLog(HttpDataServer.class); - private final DataRetriever retriever; - - public HttpDataServerHandler(DataRetriever retriever) { - this.retriever = retriever; - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); - return; - } - - FileChunk [] file; - try { - file = retriever.handle(ctx, request); - } catch (FileNotFoundException fnf) { - LOG.error(fnf); - sendError(ctx, NOT_FOUND); - return; - } catch (IllegalArgumentException iae) { - LOG.error(iae); - sendError(ctx, BAD_REQUEST); - return; - } catch (FileAccessForbiddenException fafe) { - LOG.error(fafe); - sendError(ctx, FORBIDDEN); - return; - } catch (IOException ioe) { - LOG.error(ioe); - sendError(ctx, INTERNAL_SERVER_ERROR); - return; - } - - // Write the content. - Channel ch = e.getChannel(); - if (file == null) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); - ch.write(response); - if (!isKeepAlive(request)) { - ch.close(); - } - } else { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - long totalSize = 0; - for (FileChunk chunk : file) { - totalSize += chunk.length(); - } - setContentLength(response, totalSize); - - // Write the initial line and the header. - ch.write(response); - - ChannelFuture writeFuture = null; - - for (FileChunk chunk : file) { - writeFuture = sendFile(ctx, ch, chunk); - if (writeFuture == null) { - sendError(ctx, NOT_FOUND); - return; - } - } - - // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { - // Close the connection when the whole content is written out. - writeFuture.addListener(ChannelFutureListener.CLOSE); - } - } - } - - private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException { - RandomAccessFile raf; - try { - raf = new RandomAccessFile(file.getFile(), "r"); - } catch (FileNotFoundException fnfe) { - return null; - } - - ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) != null) { - // Cannot use zero-copy with HTTPS. - writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192)); - } else { - // No encryption - use zero-copy. - final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length()); - writeFuture = ch.write(region); - writeFuture.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) { - region.releaseExternalResources(); - } - }); - } - - return writeFuture; - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); - if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); - return; - } - - cause.printStackTrace(); - if (ch.isConnected()) { - sendError(ctx, INTERNAL_SERVER_ERROR); - } - } - - public static String sanitizeUri(String uri) { - // Decode the path. - try { - uri = URLDecoder.decode(uri, "UTF-8"); - } catch (UnsupportedEncodingException e) { - try { - uri = URLDecoder.decode(uri, "ISO-8859-1"); - } catch (UnsupportedEncodingException e1) { - throw new Error(); - } - } - - // Convert file separators. - uri = uri.replace('/', File.separatorChar); - - // Simplistic dumb security check. - // You will have to do something serious in the production environment. - if (uri.contains(File.separator + ".") - || uri.contains("." + File.separator) || uri.startsWith(".") - || uri.endsWith(".")) { - return null; - } - - // Convert to absolute path. - return uri; - } - - private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent(ChannelBuffers.copiedBuffer( - "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); - - // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java deleted file mode 100644 index 0a47f6b..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; -import org.apache.tajo.worker.dataserver.retriever.DataRetriever; - -import static org.jboss.netty.channel.Channels.pipeline; - -public class HttpDataServerPipelineFactory implements ChannelPipelineFactory { - private final DataRetriever ret; - - public HttpDataServerPipelineFactory(DataRetriever ret) { - this.ret = ret; - } - - public ChannelPipeline getPipeline() throws Exception { - // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); - - // Uncomment the following line if you want HTTPS - // SSLEngine engine = - // SecureChatSslContextFactory.getServerContext().createSSLEngine(); - // engine.setUseClientMode(false); - // pipeline.addLast("ssl", new SslHandler(engine)); - - pipeline.addLast("decoder", new HttpRequestDecoder()); - //pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); - //pipeline.addLast("deflater", new HttpContentCompressor()); - pipeline.addLast("handler", new HttpDataServerHandler(ret)); - return pipeline; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java deleted file mode 100644 index e688c39..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver; - -import com.google.common.collect.Maps; - -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URLEncoder; -import java.util.Map; - -public class HttpUtil { - public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException { - return getParamsFromQuery(uri.getQuery()); - } - - /** - * It parses a query string into key/value pairs - * - * @param queryString decoded query string - * @return key/value pairs parsed from a given query string - * @throws UnsupportedEncodingException - */ - public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException { - String [] queries = queryString.split("&"); - - Map<String,String> params = Maps.newHashMap(); - String [] param; - for (String q : queries) { - param = q.split("="); - params.put(param[0], param[1]); - } - - return params; - } - - public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException { - StringBuilder sb = new StringBuilder(); - - boolean first = true; - for (Map.Entry<String,String> param : params.entrySet()) { - if (!first) { - sb.append("&"); - } - sb.append(URLEncoder.encode(param.getKey(), "UTF-8")). - append("="). - append(URLEncoder.encode(param.getValue(), "UTF-8")); - first = false; - } - - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java deleted file mode 100644 index 9c15d0c..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver.retriever; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; -import org.apache.tajo.util.TajoIdUtils; -import org.apache.tajo.worker.dataserver.FileAccessForbiddenException; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -public class AdvancedDataRetriever implements DataRetriever { - private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class); - private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap(); - - public AdvancedDataRetriever() { - } - - public void register(TaskAttemptId id, RetrieverHandler handler) { - synchronized (handlerMap) { - if (!handlerMap.containsKey(id.toString())) { - handlerMap.put(id.toString(), handler); - } - } - } - - public void unregister(TaskAttemptId id) { - synchronized (handlerMap) { - if (handlerMap.containsKey(id.toString())) { - handlerMap.remove(id.toString()); - } - } - } - - /* (non-Javadoc) - * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest) - */ - @Override - public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) - throws IOException { - - final Map<String, List<String>> params = - new QueryStringDecoder(request.getUri()).getParameters(); - - if (!params.containsKey("qid")) { - throw new FileNotFoundException("No such qid: " + params.containsKey("qid")); - } - - if (params.containsKey("sid")) { - List<FileChunk> chunks = Lists.newArrayList(); - List<String> qids = splitMaps(params.get("qid")); - for (String qid : qids) { - String[] ids = qid.split("_"); - ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0)); - TaskId quid = new TaskId(suid, Integer.parseInt(ids[0])); - TaskAttemptId attemptId = new TaskAttemptId(quid, - Integer.parseInt(ids[1])); - RetrieverHandler handler = handlerMap.get(attemptId.toString()); - FileChunk chunk = handler.get(params); - chunks.add(chunk); - } - return chunks.toArray(new FileChunk[chunks.size()]); - } else { - RetrieverHandler handler = handlerMap.get(params.get("qid").get(0)); - FileChunk chunk = handler.get(params); - if (chunk == null) { - if (params.containsKey("qid")) { // if there is no content corresponding to the query - return null; - } else { // if there is no - throw new FileNotFoundException("No such a file corresponding to " + params.get("qid")); - } - } - - File file = chunk.getFile(); - if (file.isHidden() || !file.exists()) { - throw new FileNotFoundException("No such file: " + file.getAbsolutePath()); - } - if (!file.isFile()) { - throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file"); - } - - return new FileChunk[] {chunk}; - } - } - - private List<String> splitMaps(List<String> qids) { - if (null == qids) { - LOG.error("QueryId is EMPTY"); - return null; - } - - final List<String> ret = new ArrayList<String>(); - for (String qid : qids) { - Collections.addAll(ret, qid.split(",")); - } - return ret; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java deleted file mode 100644 index b26ba74..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver.retriever; - -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.http.HttpRequest; - -import java.io.IOException; - -public interface DataRetriever { - FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java deleted file mode 100644 index 62dabbd..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver.retriever; - -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.apache.tajo.worker.dataserver.FileAccessForbiddenException; -import org.apache.tajo.worker.dataserver.HttpDataServerHandler; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; - -public class DirectoryRetriever implements DataRetriever { - public String baseDir; - - public DirectoryRetriever(String baseDir) { - this.baseDir = baseDir; - } - - @Override - public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request) - throws IOException { - final String path = HttpDataServerHandler.sanitizeUri(request.getUri()); - if (path == null) { - throw new IllegalArgumentException("Wrong path: " +path); - } - - File file = new File(baseDir, path); - if (file.isHidden() || !file.exists()) { - throw new FileNotFoundException("No such file: " + baseDir + "/" + path); - } - if (!file.isFile()) { - throw new FileAccessForbiddenException("No such file: " - + baseDir + "/" + path); - } - - return new FileChunk[] {new FileChunk(file, 0, file.length())}; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java deleted file mode 100644 index 4f11168..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver.retriever; - -import java.io.File; -import java.io.FileNotFoundException; - -public class FileChunk { - private final File file; - private final long startOffset; - private final long length; - - public FileChunk(File file, long startOffset, long length) throws FileNotFoundException { - this.file = file; - this.startOffset = startOffset; - this.length = length; - } - - public File getFile() { - return this.file; - } - - public long startOffset() { - return this.startOffset; - } - - public long length() { - return this.length; - } - - public String toString() { - return " (start=" + startOffset() + ", length=" + length + ") " - + file.getAbsolutePath(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java deleted file mode 100644 index e5479cc..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver.retriever; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -public interface RetrieverHandler { - /** - * - * @param kvs url-decoded key/value pairs - * @return a desired part of a file - * @throws IOException - */ - public FileChunk get(Map<String, List<String>> kvs) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index d3ab1fd..6c606b1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -19,9 +19,7 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -44,7 +42,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -53,27 +50,22 @@ import org.apache.tajo.plan.expr.AggregationFunctionCallEval; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.session.Session; import org.apache.tajo.storage.*; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; -import org.apache.tajo.worker.RangeRetrieverHandler; import org.apache.tajo.worker.TaskAttemptContext; -import org.apache.tajo.worker.dataserver.retriever.FileChunk; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; @@ -1044,99 +1036,6 @@ public class TestPhysicalPlanner { }; @Test - public final void testIndexedStoreExec() throws IOException, PlanningException { - FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), - new Path(employee.getPath()), Integer.MAX_VALUE); - - Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), - LocalTajoTestingUtility.newTaskAttemptId(masterPlan), - new FileFragment[] {frags[0]}, workDir); - ctx.setEnforcer(new Enforcer()); - Expr context = analyzer.parse(SORT_QUERY[0]); - LogicalPlan plan = planner.createPlan(defaultContext, context); - LogicalNode rootNode = optimizer.optimize(plan); - - SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT); - DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(), - ShuffleType.RANGE_SHUFFLE); - channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray()); - ctx.setDataChannel(channel); - - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - - Tuple tuple; - exec.init(); - exec.next(); - exec.close(); - - Schema keySchema = new Schema(); - keySchema.addColumn("?empId", Type.INT4); - SortSpec[] sortSpec = new SortSpec[1]; - sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false); - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec); - BSTIndex bst = new BSTIndex(conf); - BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"), - keySchema, comp); - reader.open(); - Path outputPath = StorageUtil.concatPath(workDir, "output", "output"); - TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet()); - SeekableScanner scanner = - FileStorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath); - scanner.init(); - - int cnt = 0; - - while(scanner.next() != null) { - cnt++; - } - scanner.reset(); - - assertEquals(100 ,cnt); - - Tuple keytuple = new VTuple(1); - for(int i = 1 ; i < 100 ; i ++) { - keytuple.put(0, DatumFactory.createInt4(i)); - long offsets = reader.find(keytuple); - scanner.seek(offsets); - tuple = scanner.next(); - - assertTrue("[seek check " + (i) + " ]", ("name_" + i).equals(tuple.get(0).asChars())); - assertTrue("[seek check " + (i) + " ]" , i == tuple.get(1).asInt4()); - } - - - // The below is for testing RangeRetrieverHandler. - RowStoreEncoder encoder = RowStoreUtil.createEncoder(keySchema); - RangeRetrieverHandler handler = new RangeRetrieverHandler( - new File(new Path(workDir, "output").toUri()), keySchema, comp); - Map<String,List<String>> kvs = Maps.newHashMap(); - Tuple startTuple = new VTuple(1); - startTuple.put(0, DatumFactory.createInt4(50)); - kvs.put("start", Lists.newArrayList( - new String(Base64.encodeBase64( - encoder.toBytes(startTuple), false)))); - Tuple endTuple = new VTuple(1); - endTuple.put(0, DatumFactory.createInt4(80)); - kvs.put("end", Lists.newArrayList( - new String(Base64.encodeBase64( - encoder.toBytes(endTuple), false)))); - FileChunk chunk = handler.get(kvs); - - scanner.seek(chunk.startOffset()); - keytuple = scanner.next(); - assertEquals(50, keytuple.get(1).asInt4()); - - long endOffset = chunk.startOffset() + chunk.length(); - while((keytuple = scanner.next()) != null && scanner.getNextOffset() <= endOffset) { - assertTrue(keytuple.get(1).asInt4() <= 80); - } - - scanner.close(); - } - - @Test public final void testSortEnforcer() throws IOException, PlanningException { FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java deleted file mode 100644 index 200ba31..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ /dev/null @@ -1,381 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.LocalTajoTestingUtility; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.algebra.Expr; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.engine.planner.*; -import org.apache.tajo.engine.planner.enforce.Enforcer; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.engine.planner.physical.ExternalSortExec; -import org.apache.tajo.engine.planner.physical.PhysicalExec; -import org.apache.tajo.engine.planner.physical.ProjectionExec; -import org.apache.tajo.engine.planner.physical.RangeShuffleFileWriteExec; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.worker.dataserver.retriever.FileChunk; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestRangeRetrieverHandler { - private TajoTestingCluster util; - private TajoConf conf; - private CatalogService catalog; - private SQLAnalyzer analyzer; - private LogicalPlanner planner; - private LogicalOptimizer optimizer; - private StorageManager sm; - private Schema schema; - private static int TEST_TUPLE = 10000; - private FileSystem fs; - private Path testDir; - - @Before - public void setUp() throws Exception { - util = new TajoTestingCluster(); - conf = util.getConfiguration(); - testDir = CommonTestingUtil.getTestDir(); - fs = testDir.getFileSystem(conf); - util.startCatalogCluster(); - catalog = util.getMiniCatalogCluster().getCatalog(); - catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); - catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); - sm = StorageManager.getFileStorageManager(conf, testDir); - - analyzer = new SQLAnalyzer(); - planner = new LogicalPlanner(catalog); - optimizer = new LogicalOptimizer(conf); - - schema = new Schema(); - schema.addColumn("empid", Type.INT4); - schema.addColumn("age", Type.INT4); - } - - @After - public void tearDown() { - util.shutdownCatalogCluster(); - } - - public String [] SORT_QUERY = { - "select empId, age from employee order by empId, age", - "select empId, age from employee order by empId desc, age desc" - }; - - @Test - public void testGet() throws Exception { - Tuple firstTuple = null; - Tuple lastTuple; - - TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); - - Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv"); - fs.mkdirs(tableDir.getParent()); - Appender appender = ((FileStorageManager)sm).getAppender(employeeMeta, schema, tableDir); - appender.init(); - - Tuple tuple = new VTuple(schema.size()); - for (int i = 0; i < TEST_TUPLE; i++) { - tuple.put( - new Datum[] { - DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 5) - }); - appender.addTuple(tuple); - - if (firstTuple == null) { - firstTuple = new VTuple(tuple); - } - } - lastTuple = new VTuple(tuple); - appender.flush(); - appender.close(); - - TableDesc employee = new TableDesc( - CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, employeeMeta, - tableDir.toUri()); - catalog.createTable(employee); - - FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tableDir, Integer.MAX_VALUE); - - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), - LocalTajoTestingUtility.newTaskAttemptId(), - new FileFragment[] {frags[0]}, testDir); - ctx.setEnforcer(new Enforcer()); - Expr expr = analyzer.parse(SORT_QUERY[0]); - LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); - LogicalNode rootNode = optimizer.optimize(plan); - - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - - ExternalSortExec sort = null; - if (exec instanceof ProjectionExec) { - ProjectionExec projExec = (ProjectionExec) exec; - sort = projExec.getChild(); - } else if (exec instanceof ExternalSortExec) { - sort = (ExternalSortExec) exec; - } else { - assertTrue(false); - } - - SortSpec[] sortSpecs = sort.getPlan().getSortKeys(); - RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort, sort.getSchema(), - sort.getSchema(), sortSpecs); - - exec = idxStoreExec; - exec.init(); - exec.next(); - exec.close(); - - Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs); - BSTIndex bst = new BSTIndex(conf); - BSTIndex.BSTIndexReader reader = bst.getIndexReader( - new Path(testDir, "output/index"), keySchema, comp); - reader.open(); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet()); - SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, - StorageUtil.concatPath(testDir, "output", "output")); - - scanner.init(); - int cnt = 0; - while(scanner.next() != null) { - cnt++; - } - scanner.reset(); - - assertEquals(TEST_TUPLE ,cnt); - - Tuple keytuple = new VTuple(2); - for(int i = 1 ; i < TEST_TUPLE ; i ++) { - keytuple.put(0, DatumFactory.createInt4(i)); - keytuple.put(1, DatumFactory.createInt4(i + 5)); - long offsets = reader.find(keytuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i) + " ]" , i == tuple.get(0).asInt4()); - //assertTrue("[seek check " + (i) + " ]" , ("name_" + i).equals(tuple.get(1).asChars())); - } - - TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple); - UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true); - TupleRange [] partitions = partitioner.partition(7); - - // The below is for testing RangeRetrieverHandler. - RangeRetrieverHandler handler = new RangeRetrieverHandler( - new File((new Path(testDir, "output")).toUri()), keySchema, comp); - - List<Long []> offsets = new ArrayList<Long []>(); - - for (int i = 0; i < partitions.length; i++) { - FileChunk chunk = getFileChunk(handler, keySchema, partitions[i], i == (partitions.length - 1)); - offsets.add(new Long[] {chunk.startOffset(), chunk.length()}); - } - scanner.close(); - - Long[] previous = null; - for (Long [] offset : offsets) { - if (offset[0] == 0 && previous == null) { - previous = offset; - continue; - } - assertTrue(previous[0] + previous[1] == offset[0]); - previous = offset; - } - long fileLength = new File((new Path(testDir, "index").toUri())).length(); - assertTrue(previous[0] + previous[1] == fileLength); - } - - @Test - public void testGetFromDescendingOrder() throws Exception { - Tuple firstTuple = null; - Tuple lastTuple; - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv"); - fs.mkdirs(tablePath.getParent()); - Appender appender = ((FileStorageManager)sm).getAppender(meta, schema, tablePath); - appender.init(); - Tuple tuple = new VTuple(schema.size()); - for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) { - tuple.put( - new Datum[] { - DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 5) - }); - appender.addTuple(tuple); - - if (firstTuple == null) { - firstTuple = new VTuple(tuple); - } - } - lastTuple = new VTuple(tuple); - appender.flush(); - appender.close(); - - TableDesc employee = new TableDesc( - CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, tablePath.toUri()); - catalog.createTable(employee); - - FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE); - - TaskAttemptContext - ctx = new TaskAttemptContext(new QueryContext(conf), - LocalTajoTestingUtility.newTaskAttemptId(), - new FileFragment[] {frags[0]}, testDir); - ctx.setEnforcer(new Enforcer()); - Expr expr = analyzer.parse(SORT_QUERY[1]); - LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); - LogicalNode rootNode = optimizer.optimize(plan); - - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); - PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); - - ExternalSortExec sort = null; - if (exec instanceof ProjectionExec) { - ProjectionExec projExec = (ProjectionExec) exec; - sort = projExec.getChild(); - } else if (exec instanceof ExternalSortExec) { - sort = (ExternalSortExec) exec; - } else { - assertTrue(false); - } - - SortSpec[] sortSpecs = sort.getPlan().getSortKeys(); - RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort, - sort.getSchema(), sort.getSchema(), sortSpecs); - - exec = idxStoreExec; - exec.init(); - exec.next(); - exec.close(); - - Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); - BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs); - BSTIndex bst = new BSTIndex(conf); - BSTIndex.BSTIndexReader reader = bst.getIndexReader( - new Path(testDir, "output/index"), keySchema, comp); - reader.open(); - TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet()); - SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, outputMeta, schema, - StorageUtil.concatPath(testDir, "output", "output")); - scanner.init(); - int cnt = 0; - while(scanner.next() != null) { - cnt++; - } - scanner.reset(); - - assertEquals(TEST_TUPLE ,cnt); - - Tuple keytuple = new VTuple(2); - for(int i = (TEST_TUPLE - 1) ; i >= 0; i --) { - keytuple.put(0, DatumFactory.createInt4(i)); - keytuple.put(1, DatumFactory.createInt4(i + 5)); - long offsets = reader.find(keytuple); - scanner.seek(offsets); - tuple = scanner.next(); - assertTrue("[seek check " + (i) + " ]" , i == tuple.get(0).asInt4()); - } - - TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple); - UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true); - TupleRange [] partitions = partitioner.partition(25); - - File dataFile = new File((new Path(testDir, "output")).toUri()); - - // The below is for testing RangeRetrieverHandler. - RangeRetrieverHandler handler = new RangeRetrieverHandler( - dataFile, keySchema, comp); - - List<Long []> offsets = new ArrayList<Long []>(); - - for (int i = 0; i < partitions.length; i++) { - FileChunk chunk = getFileChunk(handler, keySchema, partitions[i], i == 0); - offsets.add(new Long[] {chunk.startOffset(), chunk.length()}); - } - scanner.close(); - - long fileLength = new File(dataFile, "data/data").length(); - Long[] previous = null; - for (Long [] offset : offsets) { - if (previous == null) { - assertTrue(offset[0] + offset[1] == fileLength); - previous = offset; - continue; - } - - assertTrue(offset[0] + offset[1] == previous[0]); - previous = offset; - } - } - - private FileChunk getFileChunk(RangeRetrieverHandler handler, Schema keySchema, - TupleRange range, boolean last) throws IOException { - Map<String,List<String>> kvs = Maps.newHashMap(); - RowStoreEncoder encoder = RowStoreUtil.createEncoder(keySchema); - kvs.put("start", Lists.newArrayList( - new String(Base64.encodeBase64( - encoder.toBytes(range.getStart()), - false)))); - kvs.put("end", Lists.newArrayList( - new String(Base64.encodeBase64( - encoder.toBytes(range.getEnd()), false)))); - - if (last) { - kvs.put("final", Lists.newArrayList("true")); - } - return handler.get(kvs); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java deleted file mode 100644 index 25a2fbc..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver; - -import org.apache.hadoop.net.NetUtils; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.LocalTajoTestingUtility; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TaskId; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.worker.InterDataRetriever; -import org.apache.tajo.worker.dataserver.retriever.DataRetriever; -import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever; -import org.junit.Before; -import org.junit.Test; - -import java.io.*; -import java.net.InetSocketAddress; -import java.net.URL; -import java.util.Random; - -import static org.junit.Assert.assertTrue; - -public class TestHttpDataServer { - private String TEST_DATA = "target/test-data/TestHttpDataServer"; - - @Before - public void setUp() throws Exception { - CommonTestingUtil.getTestDir(TEST_DATA); - } - - @Test - public final void testHttpDataServer() throws Exception { - Random rnd = new Random(); - FileWriter writer = new FileWriter(TEST_DATA+"/"+"testHttp"); - String watermark = "test_"+rnd.nextInt(); - writer.write(watermark+"\n"); - writer.flush(); - writer.close(); - - DataRetriever ret = new DirectoryRetriever(TEST_DATA); - HttpDataServer server = new HttpDataServer( - NetUtils.createSocketAddr("127.0.0.1:0"), ret); - server.start(); - - InetSocketAddress addr = server.getBindAddress(); - URL url = new URL("http://127.0.0.1:"+addr.getPort() - + "/testHttp"); - BufferedReader in = new BufferedReader(new InputStreamReader( - url.openStream())); - String line; - boolean found = false; - while ((line = in.readLine()) != null) { - if (line.equals(watermark)) - found = true; - } - assertTrue(found); - in.close(); - server.stop(); - } - - @Test - public final void testInterDataRetriver() throws Exception { - MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); - ExecutionBlockId schid = plan.newExecutionBlockId(); - TaskId qid1 = QueryIdFactory.newTaskId(schid); - TaskId qid2 = QueryIdFactory.newTaskId(schid); - - File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out"); - qid1Dir.mkdirs(); - File qid2Dir = new File(TEST_DATA + "/" + qid2.toString() + "/out"); - qid2Dir.mkdirs(); - - Random rnd = new Random(); - FileWriter writer = new FileWriter(qid1Dir+"/"+"testHttp"); - String watermark1 = "test_"+rnd.nextInt(); - writer.write(watermark1); - writer.flush(); - writer.close(); - - writer = new FileWriter(qid2Dir+"/"+"testHttp"); - String watermark2 = "test_"+rnd.nextInt(); - writer.write(watermark2); - writer.flush(); - writer.close(); - - InterDataRetriever ret = new InterDataRetriever(); - HttpDataServer server = new HttpDataServer( - NetUtils.createSocketAddr("127.0.0.1:0"), ret); - server.start(); - - ret.register(qid1, qid1Dir.getPath()); - ret.register(qid2, qid2Dir.getPath()); - - InetSocketAddress addr = server.getBindAddress(); - - assertDataRetrival(qid1, addr.getPort(), watermark1); - assertDataRetrival(qid2, addr.getPort(), watermark2); - - server.stop(); - } - - @Test(expected = FileNotFoundException.class) - public final void testNoSuchFile() throws Exception { - MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); - ExecutionBlockId schid = plan.newExecutionBlockId(); - TaskId qid1 = QueryIdFactory.newTaskId(schid); - TaskId qid2 = QueryIdFactory.newTaskId(schid); - - File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out"); - qid1Dir.mkdirs(); - File qid2Dir = new File(TEST_DATA + "/" + qid2.toString() + "/out"); - qid2Dir.mkdirs(); - - Random rnd = new Random(); - FileWriter writer = new FileWriter(qid1Dir+"/"+"testHttp"); - String watermark1 = "test_"+rnd.nextInt(); - writer.write(watermark1); - writer.flush(); - writer.close(); - - writer = new FileWriter(qid2Dir+"/"+"testHttp"); - String watermark2 = "test_"+rnd.nextInt(); - writer.write(watermark2); - writer.flush(); - writer.close(); - - InterDataRetriever ret = new InterDataRetriever(); - HttpDataServer server = new HttpDataServer( - NetUtils.createSocketAddr("127.0.0.1:0"), ret); - server.start(); - - ret.register(qid1, qid1Dir.getPath()); - InetSocketAddress addr = server.getBindAddress(); - assertDataRetrival(qid1, addr.getPort(), watermark1); - ret.unregister(qid1); - assertDataRetrival(qid1, addr.getPort(), watermark1); - } - - private static void assertDataRetrival(TaskId id, int port, - String watermark) throws IOException { - URL url = new URL("http://127.0.0.1:"+port - + "/?qid=" + id.toString() + "&fn=testHttp"); - BufferedReader in = new BufferedReader(new InputStreamReader( - url.openStream())); - String line; - boolean found = false; - while ((line = in.readLine()) != null) { - if (line.equals(watermark)) - found = true; - } - assertTrue(found); - in.close(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java deleted file mode 100644 index bb2eb82..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.dataserver; - -import com.google.common.collect.Maps; -import org.junit.Test; - -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class TestHttpUtil { - private URI uri = URI.create("http://127.0.0.1:80/?key1=val1&key2=val2"); - - @Test - public void testGetParams() throws UnsupportedEncodingException { - Map<String,String> params = HttpUtil.getParamsFromQuery(uri.getQuery()); - assertEquals(2, params.size()); - assertEquals("val1", params.get("key1")); - assertEquals("val2", params.get("key2")); - } - - @Test - public void testBuildQuery() throws UnsupportedEncodingException { - Map<String,String> params = Maps.newTreeMap(); - params.put("key1", "val1"); - params.put("key2", "val2"); - String query = HttpUtil.buildQuery(params); - assertEquals(uri.getQuery(), query); - } -}
