TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver. Closes #284
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b5aa7804 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b5aa7804 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b5aa7804 Branch: refs/heads/master Commit: b5aa780460fcfbf657541ee6c94d41b34b1b24b9 Parents: facd1dd Author: Hyunsik Choi <[email protected]> Authored: Mon Dec 8 17:27:16 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Dec 8 17:27:16 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + pom.xml | 2 +- tajo-pullserver/pom.xml | 146 ++++ .../tajo/pullserver/FadvisedChunkedFile.java | 81 ++ .../tajo/pullserver/FadvisedFileRegion.java | 170 ++++ .../FileAccessForbiddenException.java | 40 + .../tajo/pullserver/FileCloseListener.java | 53 ++ .../tajo/pullserver/HttpDataServerHandler.java | 245 ++++++ .../HttpDataServerPipelineFactory.java | 56 ++ .../org/apache/tajo/pullserver/HttpUtil.java | 69 ++ .../tajo/pullserver/PullServerAuxService.java | 654 +++++++++++++++ .../apache/tajo/pullserver/PullServerUtil.java | 90 +++ .../apache/tajo/pullserver/TajoPullServer.java | 73 ++ .../tajo/pullserver/TajoPullServerService.java | 808 +++++++++++++++++++ .../retriever/AdvancedDataRetriever.java | 126 +++ .../pullserver/retriever/DataRetriever.java | 29 + .../retriever/DirectoryRetriever.java | 56 ++ .../tajo/pullserver/retriever/FileChunk.java | 81 ++ .../pullserver/retriever/RetrieverHandler.java | 33 + tajo-yarn-pullserver/pom.xml | 146 ---- .../tajo/pullserver/FadvisedChunkedFile.java | 81 -- .../tajo/pullserver/FadvisedFileRegion.java | 170 ---- .../FileAccessForbiddenException.java | 40 - .../tajo/pullserver/FileCloseListener.java | 53 -- .../tajo/pullserver/HttpDataServerHandler.java | 245 ------ .../HttpDataServerPipelineFactory.java | 56 -- .../org/apache/tajo/pullserver/HttpUtil.java | 69 -- .../tajo/pullserver/PullServerAuxService.java | 654 --------------- .../apache/tajo/pullserver/PullServerUtil.java | 90 --- .../apache/tajo/pullserver/TajoPullServer.java | 73 -- .../tajo/pullserver/TajoPullServerService.java | 808 ------------------- .../retriever/AdvancedDataRetriever.java | 126 --- .../pullserver/retriever/DataRetriever.java | 29 - .../retriever/DirectoryRetriever.java | 56 -- .../tajo/pullserver/retriever/FileChunk.java | 81 -- .../pullserver/retriever/RetrieverHandler.java | 33 - 36 files changed, 2814 insertions(+), 2811 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index acc72b9..c84992b 100644 --- a/CHANGES +++ b/CHANGES @@ -154,6 +154,9 @@ Release 0.9.1 - unreleased TASKS + TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver. + (hyunsik) + TAJO-1157: Required Java version in tutorial doc needs to be updated. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3dca9c0..62e03f7 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ <module>tajo-client</module> <module>tajo-jdbc</module> <module>tajo-storage</module> - <module>tajo-yarn-pullserver</module> + <module>tajo-pullserver</module> <module>tajo-dist</module> <module>tajo-thirdparty/asm</module> </modules> http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml new file mode 100644 index 0000000..a7644a1 --- /dev/null +++ b/tajo-pullserver/pom.xml @@ -0,0 +1,146 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.9.1-SNAPSHOT</version> + <relativePath>../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <name>Tajo Core PullServer</name> + <artifactId>tajo-yarn-pullserver</artifactId> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-rpc</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-nodemanager</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-shuffle</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-project-info-reports-plugin</artifactId> + <version>2.4</version> + <configuration> + <dependencyLocationsEnabled>false</dependencyLocationsEnabled> + </configuration> + </plugin> + </plugins> + </reporting> + +</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java new file mode 100644 index 0000000..b0b8d18 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.jboss.netty.handler.stream.ChunkedFile; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; + +public class FadvisedChunkedFile extends ChunkedFile { + + private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + + private ReadaheadPool.ReadaheadRequest readaheadRequest; + + public FadvisedChunkedFile(RandomAccessFile file, long position, long count, + int chunkSize, boolean manageOsCache, int readaheadLength, + ReadaheadPool readaheadPool, String identifier) throws IOException { + super(file, position, count, chunkSize); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + } + + @Override + public Object nextChunk() throws Exception { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool + .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, + getEndOffset(), readaheadRequest); + } + return super.nextChunk(); + } + + @Override + public void close() throws Exception { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) { + try { + PullServerUtil.posixFadviseIfPossible(identifier, + fd, + getStartOffset(), getEndOffset() - getStartOffset(), + NativeIO.POSIX.POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java new file mode 100644 index 0000000..18cf4b6 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.jboss.netty.channel.DefaultFileRegion; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; + +public class FadvisedFileRegion extends DefaultFileRegion { + + private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class); + + private final boolean manageOsCache; + private final int readaheadLength; + private final ReadaheadPool readaheadPool; + private final FileDescriptor fd; + private final String identifier; + private final long count; + private final long position; + private final int shuffleBufferSize; + private final boolean shuffleTransferToAllowed; + private final FileChannel fileChannel; + + private ReadaheadPool.ReadaheadRequest readaheadRequest; + public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024; + + public FadvisedFileRegion(RandomAccessFile file, long position, long count, + boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, + String identifier) throws IOException { + this(file, position, count, manageOsCache, readaheadLength, readaheadPool, + identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true); + } + + public FadvisedFileRegion(RandomAccessFile file, long position, long count, + boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool, + String identifier, int shuffleBufferSize, + boolean shuffleTransferToAllowed) throws IOException { + super(file.getChannel(), position, count); + this.manageOsCache = manageOsCache; + this.readaheadLength = readaheadLength; + this.readaheadPool = readaheadPool; + this.fd = file.getFD(); + this.identifier = identifier; + this.fileChannel = file.getChannel(); + this.count = count; + this.position = position; + this.shuffleBufferSize = shuffleBufferSize; + this.shuffleTransferToAllowed = shuffleTransferToAllowed; + } + + @Override + public long transferTo(WritableByteChannel target, long position) + throws IOException { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { + readaheadRequest = readaheadPool.readaheadStream(identifier, fd, + getPosition() + position, readaheadLength, + getPosition() + getCount(), readaheadRequest); + } + + if(this.shuffleTransferToAllowed) { + return super.transferTo(target, position); + } else { + return customShuffleTransfer(target, position); + } + } + + /** + * This method transfers data using local buffer. It transfers data from + * a disk to a local buffer in memory, and then it transfers data from the + * buffer to the target. This is used only if transferTo is disallowed in + * the configuration file. super.TransferTo does not perform well on Windows + * due to a small IO request generated. customShuffleTransfer can control + * the size of the IO requests by changing the size of the intermediate + * buffer. + */ + @VisibleForTesting + long customShuffleTransfer(WritableByteChannel target, long position) + throws IOException { + long actualCount = this.count - position; + if (actualCount < 0 || position < 0) { + throw new IllegalArgumentException( + "position out of range: " + position + + " (expected: 0 - " + (this.count - 1) + ')'); + } + if (actualCount == 0) { + return 0L; + } + + long trans = actualCount; + int readSize; + ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize); + + while(trans > 0L && + (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) { + //adjust counters and buffer limit + if(readSize < trans) { + trans -= readSize; + position += readSize; + byteBuffer.flip(); + } else { + //We can read more than we need if the actualCount is not multiple + //of the byteBuffer size and file is big enough. In that case we cannot + //use flip method but we need to set buffer limit manually to trans. + byteBuffer.limit((int)trans); + byteBuffer.position(0); + position += trans; + trans = 0; + } + + //write data to the target + while(byteBuffer.hasRemaining()) { + target.write(byteBuffer); + } + + byteBuffer.clear(); + } + + return actualCount - trans; + } + + + @Override + public void releaseExternalResources() { + if (readaheadRequest != null) { + readaheadRequest.cancel(); + } + super.releaseExternalResources(); + } + + /** + * Call when the transfer completes successfully so we can advise the OS that + * we don't need the region to be cached anymore. + */ + public void transferSuccessful() { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) { + try { + PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(), + NativeIO.POSIX.POSIX_FADV_DONTNEED); + } catch (Throwable t) { + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java new file mode 100644 index 0000000..c703f6f --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import java.io.IOException; + +public class FileAccessForbiddenException extends IOException { + private static final long serialVersionUID = -3383272565826389213L; + + public FileAccessForbiddenException() { + } + + public FileAccessForbiddenException(String message) { + super(message); + } + + public FileAccessForbiddenException(Throwable cause) { + super(cause); + } + + public FileAccessForbiddenException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java new file mode 100644 index 0000000..236db89 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; + +public class FileCloseListener implements ChannelFutureListener { + + private FadvisedFileRegion filePart; + private String requestUri; + private TajoPullServerService pullServerService; + private long startTime; + + public FileCloseListener(FadvisedFileRegion filePart, + String requestUri, + long startTime, + TajoPullServerService pullServerService) { + this.filePart = filePart; + this.requestUri = requestUri; + this.pullServerService = pullServerService; + this.startTime = startTime; + } + + // TODO error handling; distinguish IO/connection failures, + // attribute to appropriate spill output + @Override + public void operationComplete(ChannelFuture future) { + if(future.isSuccess()){ + filePart.transferSuccessful(); + } + filePart.releaseExternalResources(); + if (pullServerService != null) { + pullServerService.completeFileChunk(filePart, requestUri, startTime); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java new file mode 100644 index 0000000..31db15c --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.pullserver.retriever.DataRetriever; +import org.apache.tajo.pullserver.retriever.FileChunk; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.frame.TooLongFrameException; +import org.jboss.netty.handler.codec.http.*; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedFile; +import org.jboss.netty.util.CharsetUtil; + +import java.io.*; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; +import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; +import static org.jboss.netty.handler.codec.http.HttpMethod.GET; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { + private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class); + + Map<ExecutionBlockId, DataRetriever> retrievers = + new ConcurrentHashMap<ExecutionBlockId, DataRetriever>(); + private String userName; + private String appId; + + public HttpDataServerHandler(String userName, String appId) { + this.userName= userName; + this.appId = appId; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + HttpRequest request = (HttpRequest) e.getMessage(); + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + + String base = + ContainerLocalizer.USERCACHE + "/" + userName + "/" + + ContainerLocalizer.APPCACHE + "/" + + appId + "/output" + "/"; + + final Map<String, List<String>> params = + new QueryStringDecoder(request.getUri()).getParameters(); + + List<FileChunk> chunks = Lists.newArrayList(); + List<String> taskIds = splitMaps(params.get("ta")); + int sid = Integer.valueOf(params.get("sid").get(0)); + int partitionId = Integer.valueOf(params.get("p").get(0)); + for (String ta : taskIds) { + + File file = new File(base + "/" + sid + "/" + ta + "/output/" + partitionId); + FileChunk chunk = new FileChunk(file, 0, file.length()); + chunks.add(chunk); + } + + FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); +// try { +// file = retriever.handle(ctx, request); +// } catch (FileNotFoundException fnf) { +// LOG.error(fnf); +// sendError(ctx, NOT_FOUND); +// return; +// } catch (IllegalArgumentException iae) { +// LOG.error(iae); +// sendError(ctx, BAD_REQUEST); +// return; +// } catch (FileAccessForbiddenException fafe) { +// LOG.error(fafe); +// sendError(ctx, FORBIDDEN); +// return; +// } catch (IOException ioe) { +// LOG.error(ioe); +// sendError(ctx, INTERNAL_SERVER_ERROR); +// return; +// } + + // Write the content. + Channel ch = e.getChannel(); + if (file == null) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); + ch.write(response); + if (!isKeepAlive(request)) { + ch.close(); + } + } else { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + long totalSize = 0; + for (FileChunk chunk : file) { + totalSize += chunk.length(); + } + setContentLength(response, totalSize); + + // Write the initial line and the header. + ch.write(response); + + ChannelFuture writeFuture = null; + + for (FileChunk chunk : file) { + writeFuture = sendFile(ctx, ch, chunk); + if (writeFuture == null) { + sendError(ctx, NOT_FOUND); + return; + } + } + + // Decide whether to close the connection or not. + if (!isKeepAlive(request)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } + } + } + + private ChannelFuture sendFile(ChannelHandlerContext ctx, + Channel ch, + FileChunk file) throws IOException { + RandomAccessFile raf; + try { + raf = new RandomAccessFile(file.getFile(), "r"); + } catch (FileNotFoundException fnfe) { + return null; + } + + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) != null) { + // Cannot use zero-copy with HTTPS. + writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), + file.length(), 8192)); + } else { + // No encryption - use zero-copy. + final FileRegion region = new DefaultFileRegion(raf.getChannel(), + file.startOffset(), file.length()); + writeFuture = ch.write(region); + writeFuture.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) { + region.releaseExternalResources(); + } + }); + } + + return writeFuture; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + Channel ch = e.getChannel(); + Throwable cause = e.getCause(); + if (cause instanceof TooLongFrameException) { + sendError(ctx, BAD_REQUEST); + return; + } + + cause.printStackTrace(); + if (ch.isConnected()) { + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + + public static String sanitizeUri(String uri) { + // Decode the path. + try { + uri = URLDecoder.decode(uri, "UTF-8"); + } catch (UnsupportedEncodingException e) { + try { + uri = URLDecoder.decode(uri, "ISO-8859-1"); + } catch (UnsupportedEncodingException e1) { + throw new Error(); + } + } + + // Convert file separators. + uri = uri.replace('/', File.separatorChar); + + // Simplistic dumb security check. + // You will have to do something serious in the production environment. + if (uri.contains(File.separator + ".") + || uri.contains("." + File.separator) || uri.startsWith(".") + || uri.endsWith(".")) { + return null; + } + + // Convert to absolute path. + return uri; + } + + private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent(ChannelBuffers.copiedBuffer( + "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + } + + private List<String> splitMaps(List<String> qids) { + if (null == qids) { + LOG.error("QueryUnitId is EMPTY"); + return null; + } + + final List<String> ret = new ArrayList<String>(); + for (String qid : qids) { + Collections.addAll(ret, qid.split(",")); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java new file mode 100644 index 0000000..4c8bd8b --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.handler.codec.http.HttpContentCompressor; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; +import org.jboss.netty.handler.stream.ChunkedWriteHandler; + +import static org.jboss.netty.channel.Channels.pipeline; + +public class HttpDataServerPipelineFactory implements ChannelPipelineFactory { + private String userName; + private String appId; + public HttpDataServerPipelineFactory(String userName, String appId) { + this.userName = userName; + this.appId = appId; + } + + public ChannelPipeline getPipeline() throws Exception { + // Create a default pipeline implementation. + ChannelPipeline pipeline = pipeline(); + + // Uncomment the following line if you want HTTPS + // SSLEngine engine = + // SecureChatSslContextFactory.getServerContext().createSSLEngine(); + // engine.setUseClientMode(false); + // pipeline.addLast("ssl", new SslHandler(engine)); + + pipeline.addLast("decoder", new HttpRequestDecoder()); + //pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + pipeline.addLast("deflater", new HttpContentCompressor()); + pipeline.addLast("handler", new HttpDataServerHandler(userName, appId)); + return pipeline; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java new file mode 100644 index 0000000..2cbb101 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import com.google.common.collect.Maps; + +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLEncoder; +import java.util.Map; + +public class HttpUtil { + public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException { + return getParamsFromQuery(uri.getQuery()); + } + + /** + * It parses a query string into key/value pairs + * + * @param queryString decoded query string + * @return key/value pairs parsed from a given query string + * @throws java.io.UnsupportedEncodingException + */ + public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException { + String [] queries = queryString.split("&"); + + Map<String,String> params = Maps.newHashMap(); + String [] param; + for (String q : queries) { + param = q.split("="); + params.put(param[0], param[1]); + } + + return params; + } + + public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException { + StringBuilder sb = new StringBuilder(); + + boolean first = true; + for (Map.Entry<String,String> param : params.entrySet()) { + if (!first) { + sb.append("&"); + } + sb.append(URLEncoder.encode(param.getKey(), "UTF-8")). + append("="). + append(URLEncoder.encode(param.getValue(), "UTF-8")); + first = false; + } + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java new file mode 100644 index 0000000..1c63c8a --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -0,0 +1,654 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.tajo.QueryId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.util.TajoIdUtils; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.frame.TooLongFrameException; +import org.jboss.netty.handler.codec.http.*; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedWriteHandler; +import org.jboss.netty.util.CharsetUtil; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; +import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; +import static org.jboss.netty.handler.codec.http.HttpMethod.GET; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class PullServerAuxService extends AuxiliaryService { + + private static final Log LOG = LogFactory.getLog(PullServerAuxService.class); + + public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache"; + public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; + + public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes"; + public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; + + private int port; + private ChannelFactory selector; + private final ChannelGroup accepted = new DefaultChannelGroup(); + private HttpPipelineFactory pipelineFact; + private int sslFileBufferSize; + + private ApplicationId appId; + private QueryId queryId; + private FileSystem localFS; + + /** + * Should the shuffle use posix_fadvise calls to manage the OS cache during + * sendfile + */ + private boolean manageOsCache; + private int readaheadLength; + private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); + + + public static final String PULLSERVER_SERVICEID = "tajo.pullserver"; + + private static final Map<String,String> userRsrc = + new ConcurrentHashMap<String,String>(); + private static String userName; + + public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = + "tajo.pullserver.ssl.file.buffer.size"; + + public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; + + @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") + static class ShuffleMetrics implements ChannelFutureListener { + @Metric({"OutputBytes","PullServer output in bytes"}) + MutableCounterLong shuffleOutputBytes; + @Metric({"Failed","# of failed shuffle outputs"}) + MutableCounterInt shuffleOutputsFailed; + @Metric({"Succeeded","# of succeeded shuffle outputs"}) + MutableCounterInt shuffleOutputsOK; + @Metric({"Connections","# of current shuffle connections"}) + MutableGaugeInt shuffleConnections; + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + shuffleOutputsOK.incr(); + } else { + shuffleOutputsFailed.incr(); + } + shuffleConnections.decr(); + } + } + + final ShuffleMetrics metrics; + + PullServerAuxService(MetricsSystem ms) { + super("httpshuffle"); + metrics = ms.register(new ShuffleMetrics()); + } + + @SuppressWarnings("UnusedDeclaration") + public PullServerAuxService() { + this(DefaultMetricsSystem.instance()); + } + + /** + * Serialize the shuffle port into a ByteBuffer for use later on. + * @param port the port to be sent to the ApplciationMaster + * @return the serialized form of the port. + */ + public static ByteBuffer serializeMetaData(int port) throws IOException { + //TODO these bytes should be versioned + DataOutputBuffer port_dob = new DataOutputBuffer(); + port_dob.writeInt(port); + return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); + } + + /** + * A helper function to deserialize the metadata returned by PullServerAuxService. + * @param meta the metadata returned by the PullServerAuxService + * @return the port the PullServer Handler is listening on to serve shuffle data. + */ + public static int deserializeMetaData(ByteBuffer meta) throws IOException { + //TODO this should be returning a class not just an int + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(meta); + return in.readInt(); + } + + @Override + public void initializeApplication(ApplicationInitializationContext appInitContext) { + // TODO these bytes should be versioned + // TODO: Once SHuffle is out of NM, this can use MR APIs + this.appId = appInitContext.getApplicationId(); + this.queryId = TajoIdUtils.parseQueryId(appId.toString()); + this.userName = appInitContext.getUser(); + userRsrc.put(this.appId.toString(), this.userName); + } + + @Override + public void stopApplication(ApplicationTerminationContext appStopContext) { + userRsrc.remove(appStopContext.getApplicationId().toString()); + } + + @Override + public synchronized void init(Configuration conf) { + try { + manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, + DEFAULT_SHUFFLE_MANAGE_OS_CACHE); + + readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, + DEFAULT_SHUFFLE_READAHEAD_BYTES); + + ThreadFactory bossFactory = new ThreadFactoryBuilder() + .setNameFormat("PullServerAuxService Netty Boss #%d") + .build(); + ThreadFactory workerFactory = new ThreadFactoryBuilder() + .setNameFormat("PullServerAuxService Netty Worker #%d") + .build(); + + selector = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(bossFactory), + Executors.newCachedThreadPool(workerFactory)); + + localFS = new LocalFileSystem(); + super.init(new Configuration(conf)); + } catch (Throwable t) { + LOG.error(t); + } + } + + // TODO change AbstractService to throw InterruptedException + @Override + public synchronized void start() { + Configuration conf = getConfig(); + ServerBootstrap bootstrap = new ServerBootstrap(selector); + try { + pipelineFact = new HttpPipelineFactory(conf); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + bootstrap.setPipelineFactory(pipelineFact); + port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, + ConfVars.PULLSERVER_PORT.defaultIntVal); + Channel ch = bootstrap.bind(new InetSocketAddress(port)); + accepted.add(ch); + port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); + conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); + pipelineFact.PullServer.setPort(port); + LOG.info(getName() + " listening on port " + port); + super.start(); + + sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, + DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); + } + + public int getPort() { + return port; + } + + @Override + public synchronized void stop() { + try { + accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); + ServerBootstrap bootstrap = new ServerBootstrap(selector); + bootstrap.releaseExternalResources(); + pipelineFact.destroy(); + + localFS.close(); + } catch (Throwable t) { + LOG.error(t); + } finally { + super.stop(); + } + } + + @Override + public synchronized ByteBuffer getMetaData() { + try { + return serializeMetaData(port); + } catch (IOException e) { + LOG.error("Error during getMeta", e); + // TODO add API to AuxiliaryServices to report failures + return null; + } + } + + class HttpPipelineFactory implements ChannelPipelineFactory { + + final PullServer PullServer; + private SSLFactory sslFactory; + + public HttpPipelineFactory(Configuration conf) throws Exception { + PullServer = new PullServer(conf); + if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, + ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) { + sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); + sslFactory.init(); + } + } + + public void destroy() { + if (sslFactory != null) { + sslFactory.destroy(); + } + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + if (sslFactory != null) { + pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); + } + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunking", new ChunkedWriteHandler()); + pipeline.addLast("shuffle", PullServer); + return pipeline; + // TODO factor security manager into pipeline + // TODO factor out encode/decode to permit binary shuffle + // TODO factor out decode of index to permit alt. models + } + } + + class PullServer extends SimpleChannelUpstreamHandler { + private final Configuration conf; + private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + private int port; + + public PullServer(Configuration conf) { + this.conf = conf; + this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal); + } + + public void setPort(int port) { + this.port = port; + } + + private List<String> splitMaps(List<String> mapq) { + if (null == mapq) { + return null; + } + final List<String> ret = new ArrayList<String>(); + for (String s : mapq) { + Collections.addAll(ret, s.split(",")); + } + return ret; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + + HttpRequest request = (HttpRequest) e.getMessage(); + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + + // Parsing the URL into key-values + final Map<String, List<String>> params = + new QueryStringDecoder(request.getUri()).getParameters(); + final List<String> types = params.get("type"); + final List<String> taskIdList = params.get("ta"); + final List<String> subQueryIds = params.get("sid"); + final List<String> partitionIds = params.get("p"); + + if (types == null || taskIdList == null || subQueryIds == null + || partitionIds == null) { + sendError(ctx, "Required type, taskIds, subquery Id, and partition id", + BAD_REQUEST); + return; + } + + if (types.size() != 1 || subQueryIds.size() != 1) { + sendError(ctx, "Required type, taskIds, subquery Id, and partition id", + BAD_REQUEST); + return; + } + + final List<FileChunk> chunks = Lists.newArrayList(); + + String repartitionType = types.get(0); + String sid = subQueryIds.get(0); + String partitionId = partitionIds.get(0); + List<String> taskIds = splitMaps(taskIdList); + + // the working dir of tajo worker for each query + String queryBaseDir = queryId + "/output" + "/"; + + LOG.info("PullServer request param: repartitionType=" + repartitionType + + ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList); + + String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname); + if (taskLocalDir == null || + taskLocalDir.equals("")) { + LOG.error("Tajo local directory should be specified."); + } + LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir); + + // if a subquery requires a range partitioning + if (repartitionType.equals("r")) { + String ta = taskIds.get(0); + Path path = localFS.makeQualified( + lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + + ta + "/output/", conf)); + + String startKey = params.get("start").get(0); + String endKey = params.get("end").get(0); + boolean last = params.get("final") != null; + + FileChunk chunk; + try { + chunk = getFileCunks(path, startKey, endKey, last); + } catch (Throwable t) { + LOG.error("ERROR Request: " + request.getUri(), t); + sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST); + return; + } + if (chunk != null) { + chunks.add(chunk); + } + + // if a subquery requires a hash repartition or a scattered hash repartition + } else if (repartitionType.equals("h") || repartitionType.equals("s")) { + for (String ta : taskIds) { + Path path = localFS.makeQualified( + lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + + ta + "/output/" + partitionId, conf)); + File file = new File(path.toUri()); + FileChunk chunk = new FileChunk(file, 0, file.length()); + chunks.add(chunk); + } + } else { + LOG.error("Unknown repartition type: " + repartitionType); + return; + } + + // Write the content. + Channel ch = e.getChannel(); + if (chunks.size() == 0) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); + ch.write(response); + if (!isKeepAlive(request)) { + ch.close(); + } + } else { + FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + long totalSize = 0; + for (FileChunk chunk : file) { + totalSize += chunk.length(); + } + setContentLength(response, totalSize); + + // Write the initial line and the header. + ch.write(response); + + ChannelFuture writeFuture = null; + + for (FileChunk chunk : file) { + writeFuture = sendFile(ctx, ch, chunk); + if (writeFuture == null) { + sendError(ctx, NOT_FOUND); + return; + } + } + + // Decide whether to close the connection or not. + if (!isKeepAlive(request)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } + } + } + + private ChannelFuture sendFile(ChannelHandlerContext ctx, + Channel ch, + FileChunk file) throws IOException { + RandomAccessFile spill; + try { + spill = new RandomAccessFile(file.getFile(), "r"); + } catch (FileNotFoundException e) { + LOG.info(file.getFile() + " not found"); + return null; + } + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) == null) { + final FadvisedFileRegion partition = new FadvisedFileRegion(spill, + file.startOffset(), file.length(), manageOsCache, readaheadLength, + readaheadPool, file.getFile().getAbsolutePath()); + writeFuture = ch.write(partition); + writeFuture.addListener(new FileCloseListener(partition, null, 0, null)); + } else { + // HTTPS cannot be done with zero copy. + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + file.startOffset(), file.length(), sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + file.getFile().getAbsolutePath()); + writeFuture = ch.write(chunk); + } + metrics.shuffleConnections.incr(); + metrics.shuffleOutputBytes.incr(file.length()); // optimistic + return writeFuture; + } + + private void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + sendError(ctx, "", status); + } + + private void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent( + ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + Channel ch = e.getChannel(); + Throwable cause = e.getCause(); + if (cause instanceof TooLongFrameException) { + sendError(ctx, BAD_REQUEST); + return; + } + + LOG.error("PullServer error: ", cause); + if (ch.isConnected()) { + LOG.error("PullServer error " + e); + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + } + + public FileChunk getFileCunks(Path outDir, + String startKey, + String endKey, + boolean last) throws IOException { + BSTIndex index = new BSTIndex(new TajoConf()); + BSTIndex.BSTIndexReader idxReader = + index.getIndexReader(new Path(outDir, "index")); + idxReader.open(); + Schema keySchema = idxReader.getKeySchema(); + TupleComparator comparator = idxReader.getComparator(); + + LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + + idxReader.getLastKey()); + + File data = new File(URI.create(outDir.toUri() + "/output")); + byte [] startBytes = Base64.decodeBase64(startKey); + byte [] endBytes = Base64.decodeBase64(endKey); + + RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); + Tuple start; + Tuple end; + try { + start = decoder.toTuple(startBytes); + } catch (Throwable t) { + throw new IllegalArgumentException("StartKey: " + startKey + + ", decoded byte size: " + startBytes.length, t); + } + + try { + end = decoder.toTuple(endBytes); + } catch (Throwable t) { + throw new IllegalArgumentException("EndKey: " + endKey + + ", decoded byte size: " + endBytes.length, t); + } + + + if(!comparator.isAscendingFirstKey()) { + Tuple tmpKey = start; + start = end; + end = tmpKey; + } + + LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end + + (last ? ", last=true" : "") + ")"); + + if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero + LOG.info("There is no contents"); + return null; + } + + if (comparator.compare(end, idxReader.getFirstKey()) < 0 || + comparator.compare(idxReader.getLastKey(), start) < 0) { + LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + + "], but request start:" + start + ", end: " + end); + return null; + } + + long startOffset; + long endOffset; + try { + startOffset = idxReader.find(start); + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + try { + endOffset = idxReader.find(end); + if (endOffset == -1) { + endOffset = idxReader.find(end, true); + } + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + + // if startOffset == -1 then case 2-1 or case 3 + if (startOffset == -1) { // this is a hack + // if case 2-1 or case 3 + try { + startOffset = idxReader.find(start, true); + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + } + + if (startOffset == -1) { + throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + + "State Dump (the requested range: " + + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + } + + // if greater than indexed values + if (last || (endOffset == -1 + && comparator.compare(idxReader.getLastKey(), end) < 0)) { + endOffset = data.length(); + } + + FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); + LOG.info("Retrieve File Chunk: " + chunk); + return chunk; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java new file mode 100644 index 0000000..564950f --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.apache.commons.lang.reflect.MethodUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.nativeio.NativeIO; + +import java.io.FileDescriptor; +import java.lang.reflect.Method; + +public class PullServerUtil { + private static final Log LOG = LogFactory.getLog(PullServerUtil.class); + + private static boolean nativeIOPossible = false; + private static Method posixFadviseIfPossible; + + static { + if (NativeIO.isAvailable() && loadNativeIO()) { + nativeIOPossible = true; + } else { + LOG.warn("Unable to load hadoop nativeIO"); + } + } + + public static boolean isNativeIOPossible() { + return nativeIOPossible; + } + + /** + * Call posix_fadvise on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + */ + public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd, + long offset, long len, int flags) { + if (nativeIOPossible) { + try { + posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags); + } catch (Throwable t) { + nativeIOPossible = false; + LOG.warn("Failed to manage OS cache for " + identifier, t); + } + } + } + + /* load hadoop native method if possible */ + private static boolean loadNativeIO() { + boolean loaded = true; + if (nativeIOPossible) return loaded; + + Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE}; + try { + Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]); + Class posixClass; + if (getCacheManipulator != null) { + Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null); + posixClass = posix.getClass(); + } else { + posixClass = NativeIO.POSIX.class; + } + posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters); + } catch (Throwable e) { + loaded = false; + LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage()); + } + + if (posixFadviseIfPossible == null) { + loaded = false; + } + return loaded; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java new file mode 100644 index 0000000..d030eed --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.CompositeService; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.pullserver.PullServerAuxService.PullServer; +import org.apache.tajo.util.StringUtils; + +public class TajoPullServer extends CompositeService { + private static final Log LOG = LogFactory.getLog(TajoPullServer.class); + + private TajoPullServerService pullService; + private TajoConf systemConf; + + public TajoPullServer() { + super(TajoPullServer.class.getName()); + } + + @Override + public void init(Configuration conf) { + this.systemConf = (TajoConf)conf; + pullService = new TajoPullServerService(); + addService(pullService); + + super.init(conf); + } + + public void startPullServer(TajoConf systemConf) { + init(systemConf); + start(); + } + + public void start() { + super.start(); + + } + + public static void main(String[] args) throws Exception { + StringUtils.startupShutdownMessage(PullServer.class, args, LOG); + + if (!TajoPullServerService.isStandalone()) { + LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'"); + return; + } + + TajoConf tajoConf = new TajoConf(); + tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME)); + + (new TajoPullServer()).startPullServer(tajoConf); + } +}
