Repository: tajo Updated Branches: refs/heads/branch-0.11.1 8ddefef85 -> e3443c6df
http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 61c30dd..bad2510 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -25,17 +25,19 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; @@ -53,6 +55,7 @@ import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; @@ -92,11 +95,13 @@ public class TaskImpl implements Task { private long endTime; private List<FileChunk> localChunks; + private List<FileChunk> remoteChunks; // TODO - to be refactored private ShuffleType shuffleType = null; private Schema finalSchema = null; private TupleComparator sortComp = null; + private final int maxUrlLength; public TaskImpl(final TaskRequest request, final ExecutionBlockContext executionBlockContext) throws IOException { @@ -122,6 +127,7 @@ public class TaskImpl implements Task { this.context.setDataChannel(request.getDataChannel()); this.context.setEnforcer(request.getEnforcer()); this.context.setState(TaskAttemptState.TA_PENDING); + this.maxUrlLength = systemConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH); } public void initPlan() throws IOException { @@ -148,14 +154,15 @@ public class TaskImpl implements Task { } this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); + this.remoteChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); LOG.info(String.format("* Task %s is initialized. InterQuery: %b, Shuffle: %s, Fragments: %d, Fetches:%d, " + "Local dir: %s", request.getId(), interQuery, shuffleType, request.getFragments().size(), request.getFetches().size(), taskDir)); if(LOG.isDebugEnabled()) { - for (FetchImpl f : request.getFetches()) { - LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); + for (FetchProto f : request.getFetches()) { + LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + Repartitioner.createSimpleURIs(maxUrlLength, f)); } } @@ -250,6 +257,21 @@ public class TaskImpl implements Task { @Override public void fetch(ExecutorService fetcherExecutor) { + // Sort the execution order of fetch runners to increase the cache hit in pull server + fetcherRunners.sort(new Comparator<Fetcher>() { + @Override + public int compare(Fetcher f1, Fetcher f2) { + String strUri = f1.getURI().toString(); + int index = strUri.lastIndexOf("&ta"); + String taskIdStr1 = strUri.substring(index + "&ta".length()); + + strUri = f2.getURI().toString(); + index = strUri.lastIndexOf("&ta"); + String taskIdStr2 = strUri.substring(index + "&ta".length()); + return taskIdStr1.compareTo(taskIdStr2); + } + }); + for (Fetcher f : fetcherRunners) { fetcherExecutor.submit(new FetchRunner(context, f)); } @@ -375,8 +397,7 @@ public class TaskImpl implements Task { if (broadcastTableNames.contains(inputTable)) { continue; } - File tableDir = new File(context.getFetchIn(), inputTable); - FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); + FileFragment[] frags = localizeFetchedData(inputTable); context.updateAssignedFragments(inputTable, frags); } } @@ -540,24 +561,22 @@ public class TaskImpl implements Task { return false; } - private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) + private FileFragment[] localizeFetchedData(String name) throws IOException { Configuration c = new Configuration(systemConf); c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); FileSystem fs = FileSystem.get(c); - Path tablePath = new Path(file.getAbsolutePath()); - List<FileFragment> listTablets = new ArrayList<FileFragment>(); + List<FileFragment> listTablets = new ArrayList<>(); FileFragment tablet; - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus f : fileLists) { - if (f.getLen() == 0) { - continue; + for (FileChunk chunk : remoteChunks) { + if (name.equals(chunk.getEbId())) { + tablet = new FileFragment(name, fs.makeQualified(new Path(chunk.getFile().getPath())), chunk.startOffset(), + chunk.length()); + listTablets.add(tablet); } - tablet = new FileFragment(name, fs.makeQualified(f.getPath()), 0l, f.getLen()); - listTablets.add(tablet); } // Special treatment for locally pseudo fetched chunks @@ -604,11 +623,16 @@ public class TaskImpl implements Task { LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); } try { - FileChunk fetched = fetcher.get(); - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null - && fetched.getFile() != null) { - if (fetched.fromRemote() == false) { - localChunks.add(fetched); + List<FileChunk> fetched = fetcher.get(); + if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) { + for (FileChunk eachFetch : fetched) { + if (eachFetch.getFile() != null) { + if (!eachFetch.fromRemote()) { + localChunks.add(eachFetch); + } else { + remoteChunks.add(eachFetch); + } + } } break; } @@ -658,7 +682,7 @@ public class TaskImpl implements Task { } private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, - List<FetchImpl> fetches) throws IOException { + List<FetchProto> fetches) throws IOException { if (fetches.size() > 0) { Path inputDir = executionBlockContext.getLocalDirAllocator(). @@ -668,50 +692,59 @@ public class TaskImpl implements Task { int localStoreChunkCount = 0; File storeDir; File defaultStoreFile; - FileChunk storeChunk = null; + List<FileChunk> storeChunkList = new ArrayList<>(); List<Fetcher> runnerList = Lists.newArrayList(); - for (FetchImpl f : fetches) { + for (FetchProto f : fetches) { storeDir = new File(inputDir.toString(), f.getName()); if (!storeDir.exists()) { if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir); } - for (URI uri : f.getURIs()) { + for (URI uri : Repartitioner.createFullURIs(maxUrlLength, f)) { + storeChunkList.clear(); defaultStoreFile = new File(storeDir, "in_" + i); InetAddress address = InetAddress.getByName(uri.getHost()); WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { - storeChunk = getLocalStoredFileChunk(uri, systemConf); + List<FileChunk> localChunkCandidates = getLocalStoredFileChunk(uri, systemConf); - // When a range request is out of range, storeChunk will be NULL. This case is normal state. - // So, we should skip and don't need to create storeChunk. - if (storeChunk == null || storeChunk.length() == 0) { - continue; - } + for (FileChunk localChunk : localChunkCandidates) { + // When a range request is out of range, storeChunk will be NULL. This case is normal state. + // So, we should skip and don't need to create storeChunk. + if (localChunk == null || localChunk.length() == 0) { + continue; + } - if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) { - storeChunk.setFromRemote(false); - localStoreChunkCount++; - } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); + if (localChunk.getFile() != null && localChunk.startOffset() > -1) { + localChunk.setFromRemote(false); + localStoreChunkCount++; + } else { + localChunk = new FileChunk(defaultStoreFile, 0, -1); + localChunk.setFromRemote(true); + } + localChunk.setEbId(f.getName()); + storeChunkList.add(localChunk); } + } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); + FileChunk remoteChunk = new FileChunk(defaultStoreFile, 0, -1); + remoteChunk.setFromRemote(true); + remoteChunk.setEbId(f.getName()); + storeChunkList.add(remoteChunk); } // If we decide that intermediate data should be really fetched from a remote host, storeChunk // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it - storeChunk.setEbId(f.getName()); - Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); - runnerList.add(fetcher); - i++; - if (LOG.isDebugEnabled()) { - LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString()); + for (FileChunk eachChunk : storeChunkList) { + Fetcher fetcher = new Fetcher(systemConf, uri, eachChunk); + runnerList.add(fetcher); + i++; + if (LOG.isDebugEnabled()) { + LOG.debug("Create a new Fetcher with storeChunk:" + eachChunk.toString()); + } } } } @@ -724,7 +757,7 @@ public class TaskImpl implements Task { } } - private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { + private List<FileChunk> getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { // Parse the URI // Parsing the URL into key-values @@ -749,28 +782,37 @@ public class TaskImpl implements Task { // The working directory of Tajo worker for each query, including stage Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid); - List<String> taskIds = TajoPullServerService.splitMaps(taskIdList); - FileChunk chunk; + List<FileChunk> chunkList = new ArrayList<>(); // If the stage requires a range shuffle if (shuffleType.equals("r")) { - Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output"); - if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) { - LOG.warn("Range shuffle - file not exist. " + outputPath); - return null; + final String startKey = params.get("start").get(0); + final String endKey = params.get("end").get(0); + final boolean last = params.get("final") != null; + final List<String> taskIds = TajoPullServerService.splitMaps(taskIdList); + + long before = System.currentTimeMillis(); + for (String eachTaskId : taskIds) { + Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output"); + if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) { + LOG.warn("Range shuffle - file not exist. " + outputPath); + continue; + } + Path path = executionBlockContext.getLocalFS().makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf)); + + try { + FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last); + chunkList.add(chunk); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + throw new IOException(t.getCause()); + } } - Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf)); - String startKey = params.get("start").get(0); - String endKey = params.get("end").get(0); - boolean last = params.get("final") != null; - - try { - chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - return null; + long after = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Index lookup time: " + (after - before) + " ms"); } // If the stage requires a hash shuffle or a scattered hash shuffle @@ -779,8 +821,7 @@ public class TaskImpl implements Task { Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId); if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) { - LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); - return null; + throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); } Path path = executionBlockContext.getLocalFS().makeQualified( executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf)); @@ -789,17 +830,16 @@ public class TaskImpl implements Task { long readLen = (offset >= 0 && length >= 0) ? length : file.length(); if (startPos >= file.length()) { - LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); - return null; + throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); } - chunk = new FileChunk(file, startPos, readLen); + FileChunk chunk = new FileChunk(file, startPos, readLen); + chunkList.add(chunk); } else { - LOG.error("Unknown shuffle type"); - return null; + throw new IOException("Unknown shuffle type"); } - return chunk; + return chunkList; } public static Path getTaskAttemptDir(TaskAttemptId quid) { http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-core/src/main/proto/ResourceProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto index a24c840..5bf4929 100644 --- a/tajo-core/src/main/proto/ResourceProtos.proto +++ b/tajo-core/src/main/proto/ResourceProtos.proto @@ -80,15 +80,17 @@ message FetchProto { required ExecutionBlockIdProto executionBlockId = 4; required int32 partitionId = 5; required string name = 6; - optional string rangeParams = 7; - optional bool hasNext = 8 [default = false]; + optional bytes range_start = 7; + optional bytes range_end = 8; + optional bool range_last_inclusive = 9; + optional bool has_next = 10 [default = false]; - //repeated part - repeated int32 taskId = 9 [packed=true]; - repeated int32 attemptId = 10 [packed=true]; + // repeated part + repeated int32 task_id = 11 [packed = true]; + repeated int32 attempt_id = 12 [packed = true]; - optional int64 offset = 11; - optional int64 length = 12; + optional int64 offset = 13; + optional int64 length = 14; } message TaskStatusProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-core/src/main/resources/webapps/worker/task.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp index f2f903b..3e17c8a 100644 --- a/tajo-core/src/main/resources/webapps/worker/task.jsp +++ b/tajo-core/src/main/resources/webapps/worker/task.jsp @@ -21,26 +21,24 @@ <%@ page import="org.apache.tajo.ExecutionBlockId" %> <%@ page import="org.apache.tajo.QueryId" %> +<%@ page import="org.apache.tajo.ResourceProtos.FetchProto" %> <%@ page import="org.apache.tajo.ResourceProtos.ShuffleFileOutput" %> <%@ page import="org.apache.tajo.TaskId" %> <%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %> <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> -<%@ page import="org.apache.tajo.querymaster.Query" %> -<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> -<%@ page import="org.apache.tajo.querymaster.Stage" %> -<%@ page import="org.apache.tajo.querymaster.Task" %> +<%@ page import="org.apache.tajo.querymaster.*" %> <%@ page import="org.apache.tajo.storage.DataLocation" %> <%@ page import="org.apache.tajo.storage.fragment.Fragment" %> <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="org.apache.tajo.worker.FetchImpl" %> <%@ page import="org.apache.tajo.worker.TajoWorker" %> <%@ page import="java.net.URI" %> <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.Map" %> <%@ page import="java.util.Set" %> +<%@ page import="org.apache.tajo.conf.TajoConf.ConfVars" %> <% String paramQueryId = request.getParameter("queryId"); @@ -64,6 +62,9 @@ return; } + int maxUrlLength = tajoWorker.getConfig().getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(), + ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal); + Query query = queryMasterTask.getQuery(); Stage stage = query.getStage(ebid); @@ -110,11 +111,11 @@ String fetchInfo = ""; delim = ""; - for (Map.Entry<String, Set<FetchImpl>> e : task.getFetchMap().entrySet()) { + for (Map.Entry<String, Set<FetchProto>> e : task.getFetchMap().entrySet()) { fetchInfo += delim + "<b>" + e.getKey() + "</b>"; delim = "<br/>"; - for (FetchImpl f : e.getValue()) { - for (URI uri : f.getSimpleURIs()){ + for (FetchProto f : e.getValue()) { + for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)){ fetchInfo += delim + uri; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java index 472b967..fc87a2e 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.exception.ExceptionUtil; import org.apache.tajo.pullserver.retriever.DataRetriever; import org.apache.tajo.pullserver.retriever.FileChunk; @@ -84,7 +85,7 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); // Write the content. - if (file == null) { + if (file.length == 0) { HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); if (!HttpHeaders.isKeepAlive(request)) { ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); @@ -171,7 +172,8 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR return; } - LOG.error(cause.getMessage(), cause); + LOG.error(cause.getMessage()); + ExceptionUtil.printStackTraceIfError(LOG, cause); if (ch.isActive()) { sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index d826127..56d7b5b 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -18,6 +18,7 @@ package org.apache.tajo.pullserver; +import com.google.common.cache.*; import com.google.common.collect.Lists; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; @@ -53,24 +54,28 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.exception.InvalidURLException; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NettyUtils; import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; +import org.apache.tajo.util.TajoIdUtils; import java.io.*; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public class TajoPullServerService extends AbstractService { @@ -88,6 +93,7 @@ public class TajoPullServerService extends AbstractService { private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private HttpChannelInitializer channelInitializer; private int sslFileBufferSize; + private int maxUrlLength; private ApplicationId appId; private FileSystem localFS; @@ -107,25 +113,56 @@ public class TajoPullServerService extends AbstractService { new ConcurrentHashMap<String,String>(); private String userName; + private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null; + private static int lowCacheHitCheckThreshold; + public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = "tajo.pullserver.ssl.file.buffer.size"; public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; - private static boolean STANDALONE = false; + private static final boolean STANDALONE; private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER; private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER; + public static final String CHUNK_LENGTH_HEADER_NAME = "c"; + + static class CacheKey { + private Path path; + private String queryId; + private String ebSeqId; + + public CacheKey(Path path, String queryId, String ebSeqId) { + this.path = path; + this.queryId = queryId; + this.ebSeqId = ebSeqId; + } + + @Override + public boolean equals(Object o) { + if (o instanceof CacheKey) { + CacheKey other = (CacheKey) o; + return Objects.equals(this.path, other.path) + && Objects.equals(this.queryId, other.queryId) + && Objects.equals(this.ebSeqId, other.ebSeqId); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(path, queryId, ebSeqId); + } + } + static { /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */ SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile"); REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles"); String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE"); - if (!StringUtils.isEmpty(standalone)) { - STANDALONE = standalone.equalsIgnoreCase("true"); - } + STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true"); } @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") @@ -193,6 +230,9 @@ public class TajoPullServerService extends AbstractService { localFS = new LocalFileSystem(); + maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(), + ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal); + conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal)); super.init(conf); @@ -210,7 +250,7 @@ public class TajoPullServerService extends AbstractService { } ServerBootstrap bootstrap = selector.clone(); - TajoConf tajoConf = (TajoConf)conf; + final TajoConf tajoConf = (TajoConf)conf; try { channelInitializer = new HttpChannelInitializer(tajoConf); } catch (Exception ex) { @@ -219,20 +259,35 @@ public class TajoPullServerService extends AbstractService { bootstrap.childHandler(channelInitializer) .channel(NioServerSocketChannel.class); - port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, - ConfVars.PULLSERVER_PORT.defaultIntVal); + port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT); ChannelFuture future = bootstrap.bind(new InetSocketAddress(port)) .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE) .syncUninterruptibly(); accepted.add(future.channel()); port = ((InetSocketAddress)future.channel().localAddress()).getPort(); - conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); + tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); LOG.info(getName() + " listening on port " + port); sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); + int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE); + int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT); + + indexReaderCache = CacheBuilder.newBuilder() + .maximumSize(cacheSize) + .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES) + .removalListener(removalListener) + .build( + new CacheLoader<CacheKey, BSTIndexReader>() { + @Override + public BSTIndexReader load(CacheKey key) throws Exception { + return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index")); + } + } + ); + lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f); if (STANDALONE) { File pullServerPortFile = getPullServerPortFile(); @@ -312,6 +367,7 @@ public class TajoPullServerService extends AbstractService { } localFS.close(); + indexReaderCache.invalidateAll(); } catch (Throwable t) { LOG.error(t, t); } finally { @@ -348,7 +404,7 @@ public class TajoPullServerService extends AbstractService { int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname, ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal); - pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize)); + pipeline.addLast("codec", new HttpServerCodec(maxUrlLength, 8192, maxChunkSize)); pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16)); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", PullServer); @@ -422,7 +478,6 @@ public class TajoPullServerService extends AbstractService { class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> { private final TajoConf conf; -// private final IndexCache indexCache; private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); @@ -447,22 +502,36 @@ public class TajoPullServerService extends AbstractService { public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { - if (request.getMethod() != HttpMethod.GET) { - sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); + if (request.getDecoderResult().isFailure()) { + LOG.error("Http decoding failed. ", request.getDecoderResult().cause()); + sendError(ctx, request.getDecoderResult().toString(), HttpResponseStatus.BAD_REQUEST); return; } - ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString()); - processingStatusMap.put(request.getUri().toString(), processingStatus); + if (request.getMethod() == HttpMethod.DELETE) { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + + clearIndexCache(request.getUri()); + return; + } else if (request.getMethod() != HttpMethod.GET) { + sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); + return; + } // Parsing the URL into key-values Map<String, List<String>> params = null; try { params = decodeParams(request.getUri()); } catch (Throwable e) { + LOG.error("Failed to decode uri " + request.getUri()); sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST); + return; } + ProcessingStatus processingStatus = new ProcessingStatus(request.getUri()); + processingStatusMap.put(request.getUri(), processingStatus); + String partId = params.get("p").get(0); String queryId = params.get("qid").get(0); String shuffleType = params.get("type").get(0); @@ -491,28 +560,33 @@ public class TajoPullServerService extends AbstractService { // if a stage requires a range shuffle if (shuffleType.equals("r")) { - Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output"); - if (!lDirAlloc.ifExists(outputPath.toString(), conf)) { - LOG.warn(outputPath + "does not exist."); - sendError(ctx, HttpResponseStatus.NO_CONTENT); - return; - } - Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf)); - String startKey = params.get("start").get(0); - String endKey = params.get("end").get(0); - boolean last = params.get("final") != null; - - FileChunk chunk; - try { - chunk = getFileChunks(path, startKey, endKey, last); - } catch (Throwable t) { - LOG.error("ERROR Request: " + request.getUri(), t); - sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST); - return; - } - if (chunk != null) { - chunks.add(chunk); + final String startKey = params.get("start").get(0); + final String endKey = params.get("end").get(0); + final boolean last = params.get("final") != null; + + long before = System.currentTimeMillis(); + for (String eachTaskId : taskIds) { + Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output"); + if (!lDirAlloc.ifExists(outputPath.toString(), conf)) { + LOG.warn(outputPath + "does not exist."); + continue; + } + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf)); + + FileChunk chunk; + try { + chunk = getFileChunks(queryId, sid, path, startKey, endKey, last); + } catch (Throwable t) { + LOG.error("ERROR Request: " + request.getUri(), t); + sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST); + return; + } + if (chunk != null) { + chunks.add(chunk); + } } + long after = System.currentTimeMillis(); + LOG.info("Index lookup time: " + (after - before) + " ms"); // if a stage requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { @@ -536,7 +610,9 @@ public class TajoPullServerService extends AbstractService { sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST); return; } - LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length()); + if (LOG.isDebugEnabled()) { + LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length()); + } FileChunk chunk = new FileChunk(file, startPos, readLen); chunks.add(chunk); } else { @@ -545,8 +621,6 @@ public class TajoPullServerService extends AbstractService { return; } - processingStatus.setNumFiles(chunks.size()); - processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime; // Write the content. if (chunks.size() == 0) { HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); @@ -562,9 +636,13 @@ public class TajoPullServerService extends AbstractService { ChannelFuture writeFuture = null; HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); long totalSize = 0; + StringBuilder sb = new StringBuilder(); for (FileChunk chunk : file) { totalSize += chunk.length(); + sb.append(Long.toString(chunk.length())).append(","); } + sb.deleteCharAt(sb.length() - 1); + HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString()); HttpHeaders.setContentLength(response, totalSize); if (HttpHeaders.isKeepAlive(request)) { @@ -580,6 +658,7 @@ public class TajoPullServerService extends AbstractService { return; } } + if (ctx.pipeline().get(SslHandler.class) == null) { writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } else { @@ -594,6 +673,49 @@ public class TajoPullServerService extends AbstractService { } } + /** + * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block. + * It is called whenever an execution block is completed. + * + * @param uri query URI which indicates the execution block id + * @throws IOException + * @throws InvalidURLException + */ + private void clearIndexCache(String uri) throws IOException, InvalidURLException { + // Simply parse the given uri + String[] tokens = uri.split("="); + if (tokens.length != 2 || !tokens[0].equals("ebid")) { + throw new IllegalArgumentException("invalid params: " + uri); + } + ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]); + String queryId = ebId.getQueryId().toString(); + String ebSeqId = Integer.toString(ebId.getId()); + List<CacheKey> removed = new ArrayList<>(); + synchronized (indexReaderCache) { + for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) { + CacheKey key = e.getKey(); + if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) { + e.getValue().forceClose(); + removed.add(e.getKey()); + } + } + indexReaderCache.invalidateAll(removed); + } + removed.clear(); + synchronized (waitForRemove) { + for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) { + CacheKey key = e.getKey(); + if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) { + e.getValue().forceClose(); + removed.add(e.getKey()); + } + } + for (CacheKey eachKey : removed) { + waitForRemove.remove(eachKey); + } + } + } + private ChannelFuture sendFile(ChannelHandlerContext ctx, FileChunk file, String requestUri) throws IOException { @@ -617,7 +739,7 @@ public class TajoPullServerService extends AbstractService { writeFuture = ctx.write(new HttpChunkedInput(chunk)); } } catch (FileNotFoundException e) { - LOG.info(file.getFile() + " not found"); + LOG.fatal(file.getFile() + " not found"); return null; } catch (Throwable e) { if (spill != null) { @@ -656,26 +778,68 @@ public class TajoPullServerService extends AbstractService { } } - public static FileChunk getFileChunks(Path outDir, + // Temporal space to wait for the completion of all index lookup operations + private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>(); + + // RemovalListener is triggered when an item is removed from the index reader cache. + // It closes index readers when they are not used anymore. + // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion. + private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = new RemovalListener<CacheKey, BSTIndexReader>() { + @Override + public void onRemoval(RemovalNotification<CacheKey, BSTIndexReader> removal) { + BSTIndexReader reader = removal.getValue(); + if (reader.getReferenceNum() == 0) { + try { + reader.close(); // tear down properly + } catch (IOException e) { + throw new RuntimeException(e); + } + waitForRemove.remove(removal.getKey()); + } else { + waitForRemove.put(removal.getKey(), reader); + } + } + }; + + public static FileChunk getFileChunks(String queryId, + String ebSeqId, + Path outDir, String startKey, String endKey, - boolean last) throws IOException { - BSTIndex index = new BSTIndex(new TajoConf()); - try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index"))) { - Schema keySchema = idxReader.getKeySchema(); - TupleComparator comparator = idxReader.getComparator(); + boolean last) throws IOException, ExecutionException { + + BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId)); + idxReader.retain(); + File data; + long startOffset; + long endOffset; + try { if (LOG.isDebugEnabled()) { - LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")"); + if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) { + LOG.debug("Too low cache hit rate: " + indexReaderCache.stats()); + } + } + + Tuple indexedFirst = idxReader.getFirstKey(); + Tuple indexedLast = idxReader.getLastKey(); + + if (indexedFirst == null && indexedLast == null) { // if # of rows is zero + if (LOG.isDebugEnabled()) { + LOG.debug("There is no contents"); + } + return null; } - File data = new File(URI.create(outDir.toUri() + "/output")); byte[] startBytes = Base64.decodeBase64(startKey); byte[] endBytes = Base64.decodeBase64(endKey); - RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); + Tuple start; Tuple end; + Schema keySchema = idxReader.getKeySchema(); + RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); + try { start = decoder.toTuple(startBytes); } catch (Throwable t) { @@ -690,23 +854,23 @@ public class TajoPullServerService extends AbstractService { + ", decoded byte size: " + endBytes.length, t); } - LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end + - (last ? ", last=true" : "") + ")"); - - if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero - LOG.info("There is no contents"); - return null; + data = new File(URI.create(outDir.toUri() + "/output")); + if (LOG.isDebugEnabled()) { + LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end + + (last ? ", last=true" : "") + ")"); } - if (comparator.compare(end, idxReader.getFirstKey()) < 0 || - comparator.compare(idxReader.getLastKey(), start) < 0) { - LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + - "], but request start:" + start + ", end: " + end); + TupleComparator comparator = idxReader.getComparator(); + + if (comparator.compare(end, indexedFirst) < 0 || + comparator.compare(indexedLast, start) < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast + + "], but request start:" + start + ", end: " + end); + } return null; } - long startOffset; - long endOffset; try { idxReader.init(); startOffset = idxReader.find(start); @@ -756,12 +920,14 @@ public class TajoPullServerService extends AbstractService { && comparator.compare(idxReader.getLastKey(), end) < 0)) { endOffset = data.length(); } + } finally { + idxReader.release(); + } - FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); + FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); - if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk); - return chunk; - } + if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk); + return chunk; } public static List<String> splitMaps(List<String> mapq) { http://git-wip-us.apache.org/repos/asf/tajo/blob/e3443c6d/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java index 7919ee7..4d1f3f8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -42,6 +42,8 @@ import java.nio.channels.FileChannel; import java.util.LinkedList; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; @@ -440,6 +442,9 @@ public class BSTIndex implements IndexMethod { } } + private static final AtomicIntegerFieldUpdater<BSTIndexReader> REFERENCE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(BSTIndexReader.class, "referenceNum"); + /** * BSTIndexReader is thread-safe. */ @@ -468,6 +473,10 @@ public class BSTIndex implements IndexMethod { private RowStoreDecoder rowStoreDecoder; + private AtomicBoolean inited = new AtomicBoolean(false); + + volatile int referenceNum; + /** * * @param fileName @@ -488,6 +497,25 @@ public class BSTIndex implements IndexMethod { open(); } + /** + * Increase the reference number of the index reader. + */ + public void retain() { + REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum + 1); + } + + /** + * Decrease the reference number of the index reader. + * This method must be called before {@link #close()}. + */ + public void release() { + REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum - 1); + } + + public int getReferenceNum() { + return referenceNum; + } + public Schema getKeySchema() { return this.keySchema; } @@ -543,8 +571,10 @@ public class BSTIndex implements IndexMethod { byteBuf.release(); } - public void init() throws IOException { - fillData(); + public synchronized void init() throws IOException { + if (inited.compareAndSet(false, true)) { + fillData(); + } } private void open() @@ -684,6 +714,8 @@ public class BSTIndex implements IndexMethod { } catch (IOException e) { //TODO this block should fix correctly counter--; + if (counter == 0) + LOG.info("counter: " + counter); if (pos != -1) { in.seek(pos); } @@ -765,6 +797,9 @@ public class BSTIndex implements IndexMethod { //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541 int centerPos = (start + end) >>> 1; + if (arr.length == 0) { + LOG.error("arr.length: 0, loadNum: " + loadNum + ", inited: " + inited.get()); + } while (true) { if (comparator.compare(arr[centerPos], key) > 0) { if (centerPos == 0) { @@ -800,8 +835,23 @@ public class BSTIndex implements IndexMethod { return offset; } + /** + * Close index reader only when it is not used anymore. + */ @Override public void close() throws IOException { + if (referenceNum == 0) { + this.indexIn.close(); + } + } + + /** + * Close index reader even though it is being used. + * + * @throws IOException + */ + public void forceClose() throws IOException { + REFERENCE_UPDATER.compareAndSet(this, referenceNum, 0); this.indexIn.close(); }
