Repository: tajo Updated Branches: refs/heads/branch-0.11.1 7fb0f25e0 -> 7734e06e5
http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 831244d..49b0e11 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -19,37 +19,62 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.logical.ShuffleFileWriteNode; -import org.apache.tajo.storage.HashShuffleAppenderWrapper; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.memory.MemoryRowBlock; +import org.apache.tajo.tuple.memory.RowBlock; +import org.apache.tajo.tuple.memory.RowWriter; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.FileUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * <code>HashShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of * file outputs associated with shuffle keys. The file outputs are stored on local disks. */ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { - private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class); - private ShuffleFileWriteNode plan; + private static final Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class); + private static final int MAXIMUM_INITIAL_BUFFER_SIZE = StorageUnit.MB; + private static final int MINIMUM_INITIAL_BUFFER_SIZE = 4 * StorageUnit.KB; + // Buffer usage is greater than threshold, it will be flush to local storage + private static final float BUFFER_THRESHOLD_FACTOR = 0.8f; + + private final ShuffleFileWriteNode plan; private final TableMeta meta; - private Partitioner partitioner; - private Map<Integer, HashShuffleAppenderWrapper> appenderMap = new HashMap<>(); + private final Partitioner partitioner; private final int numShuffleOutputs; - private final int [] shuffleKeyIds; - private HashShuffleAppenderManager hashShuffleAppenderManager; - private int numHashShuffleBufferTuples; + private final int[] shuffleKeyIds; + private final HashShuffleAppenderManager hashShuffleAppenderManager; + private final int maxBufferSize; + private final int bufferThreshold; + private final int initialBufferSize; + private final DataType[] dataTypes; + + private final Map<Integer, MemoryRowBlock> partitionMemoryMap; + private long writtenBytes = 0; + private long usedBufferSize = 0; + private long totalBufferCapacity = 0; public HashShuffleFileWriteExec(TaskAttemptContext context, final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException { @@ -71,105 +96,151 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { } this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs); this.hashShuffleAppenderManager = context.getHashShuffleAppenderManager(); - this.numHashShuffleBufferTuples = context.getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_BUFFER_SIZE); + this.maxBufferSize = context.getQueryContext().getInt(SessionVars.HASH_SHUFFLE_BUFFER_SIZE) * StorageUnit.MB; + this.bufferThreshold = (int) (maxBufferSize * BUFFER_THRESHOLD_FACTOR); + this.dataTypes = SchemaUtil.toDataTypes(outSchema); + + if(numShuffleOutputs > 0){ + //calculate initial buffer by total partition. a buffer size will be 4Kb ~ 1MB + this.initialBufferSize = Math.min(MAXIMUM_INITIAL_BUFFER_SIZE, + Math.max(maxBufferSize / numShuffleOutputs, MINIMUM_INITIAL_BUFFER_SIZE)); + } else { + this.initialBufferSize = MINIMUM_INITIAL_BUFFER_SIZE; + } + + this.partitionMemoryMap = Maps.newHashMap(); } @Override public void init() throws IOException { super.init(); } - - private HashShuffleAppenderWrapper getAppender(int partId) throws IOException { - HashShuffleAppenderWrapper appender = appenderMap.get(partId); - if (appender == null) { - appender = hashShuffleAppenderManager.getAppender(context.getConf(), - context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, outSchema); - appenderMap.put(partId, appender); - } - return appender; - } - - Map<Integer, TupleList> partitionTuples = new HashMap<Integer, TupleList>(); - long writtenBytes = 0L; @Override public Tuple next() throws IOException { try { Tuple tuple; int partId; - int tupleCount = 0; long numRows = 0; while (!context.isStopped() && (tuple = child.next()) != null) { - tupleCount++; - numRows++; partId = partitioner.getPartition(tuple); - TupleList partitionTupleList = partitionTuples.get(partId); - if (partitionTupleList == null) { - partitionTupleList = new TupleList(1000); - partitionTuples.put(partId, partitionTupleList); + MemoryRowBlock rowBlock = partitionMemoryMap.get(partId); + if (rowBlock == null) { + rowBlock = new MemoryRowBlock(dataTypes, initialBufferSize, true, plan.getStorageType()); + partitionMemoryMap.put(partId, rowBlock); + totalBufferCapacity += rowBlock.capacity(); } - partitionTupleList.add(tuple); - if (tupleCount >= numHashShuffleBufferTuples) { - for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) { - int appendPartId = entry.getKey(); - HashShuffleAppenderWrapper appender = getAppender(appendPartId); - int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); - writtenBytes += appendedSize; - entry.getValue().clear(); + + RowWriter writer = rowBlock.getWriter(); + long prevUsedMem = rowBlock.usedMem(); + totalBufferCapacity -= rowBlock.capacity(); + + writer.addTuple(tuple); + numRows++; + + totalBufferCapacity += rowBlock.capacity(); // calculate resizeable buffer capacity + usedBufferSize += (rowBlock.usedMem() - prevUsedMem); + + // if total buffer capacity are required more than maxBufferSize, + // all partitions are flushed and the buffers are released + if (totalBufferCapacity > maxBufferSize) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Too low buffer usage. threshold: %s, total capacity: %s, used: %s", + FileUtil.humanReadableByteCount(maxBufferSize, false), + FileUtil.humanReadableByteCount(totalBufferCapacity, false), + FileUtil.humanReadableByteCount(usedBufferSize, false))); } - tupleCount = 0; + + //flush and release buffer + flushBuffer(partitionMemoryMap, true); + writtenBytes += usedBufferSize; + totalBufferCapacity = usedBufferSize = 0; + + } else if (usedBufferSize > bufferThreshold) { + //flush and reuse buffer + flushBuffer(partitionMemoryMap, false); + writtenBytes += usedBufferSize; + usedBufferSize = 0; } } - // processing remained tuples - for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) { - int appendPartId = entry.getKey(); - HashShuffleAppenderWrapper appender = getAppender(appendPartId); - int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); - writtenBytes += appendedSize; - entry.getValue().clear(); - } + // flush remaining buffers + flushBuffer(partitionMemoryMap, true); + writtenBytes += usedBufferSize; + usedBufferSize = totalBufferCapacity = 0; TableStats aggregated = (TableStats) child.getInputStats().clone(); aggregated.setNumBytes(writtenBytes); aggregated.setNumRows(numRows); context.setResultStats(aggregated); - partitionTuples.clear(); - return null; } catch (RuntimeException e) { LOG.error(e.getMessage(), e); throw new IOException(e); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); throw new IOException(e); } } + /** + * flush all buffer to local storage + */ + private void flushBuffer(Map<Integer, MemoryRowBlock> partitionMemoryMap, boolean releaseBuffer) + throws IOException, ExecutionException, InterruptedException { + List<Future<MemoryRowBlock>> resultList = Lists.newArrayList(); + ArrayList<Integer> unusedBuffer = Lists.newArrayList(); + + for (Map.Entry<Integer, MemoryRowBlock> entry : partitionMemoryMap.entrySet()) { + int appendPartId = entry.getKey(); + + MemoryRowBlock memoryRowBlock = entry.getValue(); + if (memoryRowBlock.getMemory().isReadable()) { + //flush and release buffer + resultList.add(hashShuffleAppenderManager. + writePartitions(meta, outSchema, context.getTaskId(), appendPartId, memoryRowBlock, releaseBuffer)); + } else { + if (releaseBuffer) { + memoryRowBlock.release(); + } else { + unusedBuffer.add(appendPartId); + } + } + } + + // wait for flush to storage + for (Future<MemoryRowBlock> future : resultList) { + future.get(); + } + + if (releaseBuffer) { + partitionMemoryMap.clear(); + } else { + // release the unused partition + for (Integer id : unusedBuffer) { + MemoryRowBlock memoryRowBlock = partitionMemoryMap.remove(id); + memoryRowBlock.release(); + } + } + } + @Override public void rescan() throws IOException { - // nothing to do + throw new TajoRuntimeException(new UnsupportedException()); } @Override public void close() throws IOException{ - super.close(); - if (appenderMap != null) { - appenderMap.clear(); - appenderMap = null; - } - - for (TupleList eachList : partitionTuples.values()) { - eachList.clear(); + if (partitionMemoryMap.size() > 0) { + for (RowBlock rowBlock : partitionMemoryMap.values()) { + rowBlock.release(); + } + partitionMemoryMap.clear(); } - partitionTuples.clear(); - partitionTuples = null; - - partitioner = null; - plan = null; progress = 1.0f; + super.close(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java index b652b0a..cabbf4c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java @@ -26,6 +26,7 @@ import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.util.Comparator; +import java.util.List; public abstract class SortExec extends UnaryPhysicalExec { @@ -39,7 +40,7 @@ public abstract class SortExec extends UnaryPhysicalExec { this.comparator = new BaseTupleComparator(inSchema, sortSpecs); } - protected TupleSorter getSorter(TupleList tupleSlots) { + protected TupleSorter getSorter(List tupleSlots) { if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) { return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java index abf2808..39c67d6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java @@ -22,6 +22,7 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import java.util.Collections; +import java.util.List; public interface TupleSorter { @@ -29,10 +30,10 @@ public interface TupleSorter { class DefaultSorter implements TupleSorter { - private final TupleList target; + private final List target; private final TupleComparator comparator; - public DefaultSorter(TupleList target, TupleComparator comparator) { + public DefaultSorter(List target, TupleComparator comparator) { this.target = target; this.comparator = comparator; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java index 82f7153..44649cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java @@ -26,6 +26,7 @@ import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.storage.Tuple; import java.util.Iterator; +import java.util.List; /** * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting @@ -35,7 +36,7 @@ public class VectorizedSorter extends ComparableVector implements IndexedSortabl private final int[] mappings; // index indirection - public VectorizedSorter(TupleList source, SortSpec[] sortKeys, int[] keyIndex) { + public VectorizedSorter(List source, SortSpec[] sortKeys, int[] keyIndex) { super(source.size(), sortKeys, keyIndex); source.toArray(tuples); // wish it's array list mappings = new int[tuples.length]; http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 3de63fb..1c9d061 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -194,8 +194,11 @@ public class Repartitioner { long maxStats = Long.MIN_VALUE; int maxStatsScanIdx = -1; StringBuilder nonLeafScanNamesBuilder = new StringBuilder(); + + String intermediateDataFormat = schedulerContext.getMasterContext().getConf().getVar(ConfVars.SHUFFLE_FILE_FORMAT); for (int i = 0; i < scans.length; i++) { - if (scans[i].getTableDesc().getMeta().getDataFormat().equalsIgnoreCase("RAW")) { + + if (scans[i].getTableDesc().getMeta().getDataFormat().equalsIgnoreCase(intermediateDataFormat)) { // Intermediate data scan hasNonLeafNode = true; largeScanIndexList.add(i); http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 8d891b8..4862368 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -339,6 +339,10 @@ public class TajoWorker extends CompositeService { return; } + if (hashShuffleAppenderManager != null) { + hashShuffleAppenderManager.shutdown(); + } + if(webServer != null) { try { webServer.stop(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index a8e874c..eff2c28 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -25,10 +25,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.TaskAttemptId; @@ -548,6 +545,7 @@ public class TaskImpl implements Task { private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) throws IOException { + Configuration c = new Configuration(systemConf); c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); FileSystem fs = FileSystem.get(c); @@ -561,7 +559,7 @@ public class TaskImpl implements Task { if (f.getLen() == 0) { continue; } - tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); + tablet = new FileFragment(name, fs.makeQualified(f.getPath()), 0l, f.getLen()); listTablets.add(tablet); } @@ -569,7 +567,7 @@ public class TaskImpl implements Task { synchronized (localChunks) { for (FileChunk chunk : localChunks) { if (name.equals(chunk.getEbId())) { - tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); + tablet = new FileFragment(name, fs.makeQualified(new Path(chunk.getFile().getPath())), chunk.startOffset(), chunk.length()); listTablets.add(tablet); } } @@ -611,10 +609,9 @@ public class TaskImpl implements Task { try { FileChunk fetched = fetcher.get(); if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null - && fetched.getFile() != null) { + && fetched.getFile() != null) { if (fetched.fromRemote() == false) { - localChunks.add(fetched); - LOG.info("Add a new FileChunk to local chunk list"); + localChunks.add(fetched); } break; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java index d39aeb8..5d2ac2d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/stream/BufferPool.java @@ -41,11 +41,6 @@ public class BufferPool { } static { - /* TODO Enable thread cache - * Create a pooled ByteBuf allocator but disables the thread-local cache. - * Because the TaskRunner thread is newly created - * */ - if (TajoConstants.IS_TEST_MODE) { /* Disable pooling buffers for memory usage */ ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; @@ -54,7 +49,7 @@ public class BufferPool { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); } else { TajoConf tajoConf = new TajoConf(); - ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0); + ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, true), 0); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 2c385d8..ed49537 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -18,6 +18,9 @@ package org.apache.tajo.storage; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; @@ -30,20 +33,24 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.storage.rawfile.DirectRawFileWriter; +import org.apache.tajo.tuple.memory.MemoryRowBlock; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.Pair; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; public class HashShuffleAppenderManager { private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); - private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = - new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private ConcurrentMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = Maps.newConcurrentMap(); + private ConcurrentMap<Integer, ExecutorService> executors = Maps.newConcurrentMap(); // for parallel writing + private List<String> temporalPaths = Lists.newArrayList(); + private TajoConf systemConf; private FileSystem defaultFS; private FileSystem localFS; @@ -59,66 +66,88 @@ public class HashShuffleAppenderManager { // initialize DFS and LocalFileSystems defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); localFS = FileSystem.getLocal(systemConf); - pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * StorageUnit.MB; + + Iterable<Path> allLocalPath = lDirAllocator.getAllLocalPathsToRead(".", systemConf); + + //add async hash shuffle writer + for (Path path : allLocalPath) { + temporalPaths.add(localFS.makeQualified(path).toString()); + executors.put(temporalPaths.size() - 1, Executors.newSingleThreadExecutor()); + } } - public HashShuffleAppenderWrapper getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, - TableMeta meta, Schema outSchema) throws IOException { - synchronized (appenderMap) { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + protected int getVolumeId(Path path) { + int i = 0; + for (String rootPath : temporalPaths) { + if (path.toString().startsWith(rootPath)) { + break; + } + i++; + } + Preconditions.checkPositionIndex(i, temporalPaths.size() - 1); + return i; + } + + public synchronized HashShuffleAppenderWrapper getAppender(MemoryRowBlock memoryRowBlock, ExecutionBlockId ebId, + int partId, TableMeta meta, Schema outSchema) + throws IOException { + + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = Maps.newConcurrentMap(); + appenderMap.put(ebId, partitionAppenderMap); + } - if (partitionAppenderMap == null) { - partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); - appenderMap.put(ebId, partitionAppenderMap); + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); } - PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); - if (partitionAppenderMeta == null) { - Path dataFile = getDataFile(ebId, partId); - FileSystem fs = dataFile.getFileSystem(systemConf); - if (fs.exists(dataFile)) { - FileStatus status = fs.getFileStatus(dataFile); - LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); - } - - if (!fs.exists(dataFile.getParent())) { - fs.mkdirs(dataFile.getParent()); - } - - FileTablespace space = (FileTablespace) TablespaceManager.get(dataFile.toUri()); - FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile); - appender.enableStats(); - appender.init(); - - partitionAppenderMeta = new PartitionAppenderMeta(); - partitionAppenderMeta.partId = partId; - partitionAppenderMeta.dataFile = dataFile; - partitionAppenderMeta.appender = new HashShuffleAppenderWrapper(ebId, partId, pageSize, appender); - partitionAppenderMeta.appender.init(); - partitionAppenderMap.put(partId, partitionAppenderMeta); - - if (LOG.isDebugEnabled()) { - LOG.debug("Create Hash shuffle file(partId=" + partId + "): " + dataFile); - } + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); } - return partitionAppenderMeta.appender; + DirectRawFileWriter appender = + new DirectRawFileWriter(systemConf, null, outSchema, meta, dataFile, memoryRowBlock); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = + new HashShuffleAppenderWrapper(ebId, partId, pageSize, appender, getVolumeId(dataFile)); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + if (LOG.isDebugEnabled()) { + LOG.debug("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } } + + return partitionAppenderMeta.appender; } public static int getPartParentId(int partId, TajoConf tajoConf) { - return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + return partId % tajoConf.getIntVar(ConfVars.SHUFFLE_HASH_PARENT_DIRS); } private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { try { // the base dir for an output dir String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; - Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + Path baseDirPath = lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf); //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); // If EB has many partition, too many shuffle file are in single directory. - return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + return localFS.makeQualified( + StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId)); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -126,10 +155,7 @@ public class HashShuffleAppenderManager { } public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null; - synchronized (appenderMap) { - partitionAppenderMap = appenderMap.remove(ebId); - } + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.remove(ebId); if (partitionAppenderMap == null) { LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", not a hash shuffle"); @@ -137,7 +163,7 @@ public class HashShuffleAppenderManager { } // Send Intermediate data to QueryMaster. - List<HashShuffleIntermediate> intermediateEntries = new ArrayList<HashShuffleIntermediate>(); + List<HashShuffleIntermediate> intermediateEntries = Lists.newArrayList(); for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) { try { eachMeta.appender.close(); @@ -158,16 +184,43 @@ public class HashShuffleAppenderManager { } public void finalizeTask(TaskAttemptId taskId) { - synchronized (appenderMap) { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(taskId.getTaskId().getExecutionBlockId()); - if (partitionAppenderMap == null) { - return; - } + if (partitionAppenderMap == null) { + return; + } - for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) { - eachAppender.appender.taskFinished(taskId); + for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) { + eachAppender.appender.taskFinished(taskId); + } + } + + /** + * Asynchronously write partitions. + */ + public Future<MemoryRowBlock> writePartitions(TableMeta meta, Schema schema, final TaskAttemptId taskId, int partId, + final MemoryRowBlock rowBlock, + final boolean release) throws IOException { + + final HashShuffleAppenderWrapper appender = + getAppender(rowBlock, taskId.getTaskId().getExecutionBlockId(), partId, meta, schema); + ExecutorService executor = executors.get(appender.getVolumeId()); + return executor.submit(new Callable<MemoryRowBlock>() { + @Override + public MemoryRowBlock call() throws Exception { + appender.writeRowBlock(taskId, rowBlock); + + if (release) rowBlock.release(); + else rowBlock.clear(); + + return rowBlock; } + }); + } + + public void shutdown() { + for (ExecutorService service : executors.values()) { + service.shutdownNow(); } } @@ -180,7 +233,7 @@ public class HashShuffleAppenderManager { private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes; //[<page start offset, length>] - private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + private List<Pair<Long, Integer>> pages = Lists.newArrayList(); public HashShuffleIntermediate(int partId, long volume, List<Pair<Long, Integer>> pages, http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java index a88f811..0408395 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java @@ -18,17 +18,19 @@ package org.apache.tajo.storage; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.rawfile.DirectRawFileWriter; +import org.apache.tajo.tuple.memory.MemoryRowBlock; import org.apache.tajo.util.Pair; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,15 +38,16 @@ import java.util.concurrent.atomic.AtomicBoolean; public class HashShuffleAppenderWrapper implements Closeable { private static Log LOG = LogFactory.getLog(HashShuffleAppenderWrapper.class); - private FileAppender appender; + private DirectRawFileWriter appender; private AtomicBoolean closed = new AtomicBoolean(false); private int partId; + private int volumeId; //<taskId,<page start offset,<task start, task end>>> private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; //page start offset, length - private List<Pair<Long, Integer>> pages = new ArrayList<>(); + private List<Pair<Long, Integer>> pages = Lists.newArrayList(); private Pair<Long, Integer> currentPage; @@ -56,16 +59,18 @@ public class HashShuffleAppenderWrapper implements Closeable { private ExecutionBlockId ebId; - public HashShuffleAppenderWrapper(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { + public HashShuffleAppenderWrapper(ExecutionBlockId ebId, int partId, int pageSize, + DirectRawFileWriter appender, int volumeId) { this.ebId = ebId; this.partId = partId; this.appender = appender; this.pageSize = pageSize; + this.volumeId = volumeId; } public void init() throws IOException { currentPage = new Pair(0L, 0); - taskTupleIndexes = new HashMap<>(); + taskTupleIndexes = Maps.newHashMap(); rowNumInPage = 0; } @@ -73,41 +78,36 @@ public class HashShuffleAppenderWrapper implements Closeable { * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. * After writing if a current page exceeds pageSize, pageOffset will be added. * @param taskId - * @param tuples + * @param rowBlock * @return written bytes * @throws java.io.IOException */ - public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException { - synchronized(appender) { - if (closed.get()) { - return 0; - } - long currentPos = appender.getOffset(); + public MemoryRowBlock writeRowBlock(TaskAttemptId taskId, MemoryRowBlock rowBlock) throws IOException { + if (closed.get()) { + return rowBlock; + } - for (Tuple eachTuple: tuples) { - appender.addTuple(eachTuple); - } - long posAfterWritten = appender.getOffset(); + appender.writeRowBlock(rowBlock); + appender.flush(); - int writtenBytes = (int)(posAfterWritten - currentPos); + int rows = rowBlock.rows(); + long posAfterWritten = appender.getOffset(); - int nextRowNum = rowNumInPage + tuples.size(); - List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); - if (taskIndexes == null) { - taskIndexes = new ArrayList<>(); - taskTupleIndexes.put(taskId, taskIndexes); - } - taskIndexes.add( - new Pair<>(currentPage.getFirst(), new Pair<>(rowNumInPage, nextRowNum))); - rowNumInPage = nextRowNum; - - if (posAfterWritten - currentPage.getFirst() > pageSize) { - nextPage(posAfterWritten); - rowNumInPage = 0; - } + int nextRowNum = rowNumInPage + rows; + List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); + if (taskIndexes == null) { + taskIndexes = Lists.newArrayList(); + taskTupleIndexes.put(taskId, taskIndexes); + } + taskIndexes.add( + new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); + rowNumInPage = nextRowNum; - return writtenBytes; + if (posAfterWritten - currentPage.getFirst() > pageSize) { + nextPage(posAfterWritten); + rowNumInPage = 0; } + return rowBlock; } public long getOffset() throws IOException { @@ -129,42 +129,35 @@ public class HashShuffleAppenderWrapper implements Closeable { } public void flush() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); + if (closed.get()) { + return; } + appender.flush(); } @Override public void close() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); - offset = appender.getOffset(); - if (offset > currentPage.getFirst()) { - nextPage(offset); - } - appender.close(); - if (LOG.isDebugEnabled()) { - if (!pages.isEmpty()) { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() - + ", lastPage=" + pages.get(pages.size() - 1)); - } else { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); - } + if (closed.getAndSet(true)) { + return; + } + appender.flush(); + offset = appender.getOffset(); + if (offset > currentPage.getFirst()) { + nextPage(offset); + } + appender.close(); + if (LOG.isDebugEnabled()) { + if (!pages.isEmpty()) { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() + + ", lastPage=" + pages.get(pages.size() - 1)); + } else { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); } - closed.set(true); } } public TableStats getStats() { - synchronized(appender) { - return appender.getStats(); - } + return appender.getStats(); } public List<Pair<Long, Integer>> getPages() { @@ -172,7 +165,7 @@ public class HashShuffleAppenderWrapper implements Closeable { } public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { - List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<>(); + List<Pair<Long, Pair<Integer, Integer>>> merged = Lists.newArrayList(); for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { merged.addAll(eachFailureIndex); @@ -184,4 +177,8 @@ public class HashShuffleAppenderWrapper implements Closeable { public void taskFinished(TaskAttemptId taskId) { taskTupleIndexes.remove(taskId); } + + public int getVolumeId() { + return volumeId; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 87dd0df..26bd135 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -18,7 +18,6 @@ package org.apache.tajo.storage; -import com.google.protobuf.Message; import io.netty.buffer.ByteBuf; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,14 +36,15 @@ import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.rawfile.DirectRawFileWriter; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.BitArray; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.FileChannel; public class RawFile { @@ -100,7 +100,7 @@ public class RawFile { } if(buf == null) { - buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); + buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)).order(ByteOrder.LITTLE_ENDIAN); buffer = buf.nioBuffer(0, buf.capacity()); } @@ -283,96 +283,94 @@ public class RawFile { } switch (columnTypes[i].getType()) { - case BOOLEAN : - outTuple.put(i, DatumFactory.createBool(buffer.get())); - break; - - case BIT : - outTuple.put(i, DatumFactory.createBit(buffer.get())); - break; - - case CHAR : - int realLen = readRawVarint32(); - byte[] buf = new byte[realLen]; - buffer.get(buf); - outTuple.put(i, DatumFactory.createChar(buf)); - break; - - case INT2 : - outTuple.put(i, DatumFactory.createInt2(buffer.getShort())); - break; - - case INT4 : - outTuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32()))); - break; - - case INT8 : - outTuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64()))); - break; - - case FLOAT4 : - outTuple.put(i, DatumFactory.createFloat4(buffer.getFloat())); - break; - - case FLOAT8 : - outTuple.put(i, DatumFactory.createFloat8(buffer.getDouble())); - break; - - case TEXT : { - int len = readRawVarint32(); - byte [] strBytes = new byte[len]; - buffer.get(strBytes); - outTuple.put(i, DatumFactory.createText(strBytes)); - break; - } + case BOOLEAN: + outTuple.put(i, DatumFactory.createBool(buffer.get())); + break; - case BLOB : { - int len = readRawVarint32(); - byte [] rawBytes = new byte[len]; - buffer.get(rawBytes); - outTuple.put(i, DatumFactory.createBlob(rawBytes)); - break; - } + case BIT: + outTuple.put(i, DatumFactory.createBit(buffer.get())); + break; - case PROTOBUF: { - int len = readRawVarint32(); - byte [] rawBytes = new byte[len]; - buffer.get(rawBytes); + case CHAR: + int realLen = readRawVarint32(); + byte[] buf = new byte[realLen]; + buffer.get(buf); + outTuple.put(i, DatumFactory.createChar(buf)); + break; - outTuple.put(i, ProtobufDatumFactory.createDatum(columnTypes[i], rawBytes)); - break; - } + case INT2: + outTuple.put(i, DatumFactory.createInt2(buffer.getShort())); + break; - case INET4 : - byte [] ipv4Bytes = new byte[4]; - buffer.get(ipv4Bytes); - outTuple.put(i, DatumFactory.createInet4(ipv4Bytes)); - break; - - case DATE: { - int val = buffer.getInt(); - if (val < Integer.MIN_VALUE + 1) { - outTuple.put(i, DatumFactory.createNullDatum()); - } else { - outTuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val)); - } - break; + case INT4: + outTuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32()))); + break; + + case INT8: + outTuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64()))); + break; + + case FLOAT4: + outTuple.put(i, DatumFactory.createFloat4(buffer.getFloat())); + break; + + case FLOAT8: + outTuple.put(i, DatumFactory.createFloat8(buffer.getDouble())); + break; + + case TEXT: { + int len = readRawVarint32(); + byte[] strBytes = new byte[len]; + buffer.get(strBytes); + outTuple.put(i, DatumFactory.createText(strBytes)); + break; + } + + case BLOB: { + int len = readRawVarint32(); + byte[] rawBytes = new byte[len]; + buffer.get(rawBytes); + outTuple.put(i, DatumFactory.createBlob(rawBytes)); + break; + } + + case PROTOBUF: { + int len = readRawVarint32(); + byte[] rawBytes = new byte[len]; + buffer.get(rawBytes); + + outTuple.put(i, ProtobufDatumFactory.createDatum(columnTypes[i], rawBytes)); + break; + } + + case INET4: + outTuple.put(i, DatumFactory.createInet4(buffer.getInt())); + break; + + case DATE: { + int val = buffer.getInt(); + if (val < Integer.MIN_VALUE + 1) { + outTuple.put(i, DatumFactory.createNullDatum()); + } else { + outTuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val)); } - case TIME: - case TIMESTAMP: { - long val = buffer.getLong(); - if (val < Long.MIN_VALUE + 1) { - outTuple.put(i, DatumFactory.createNullDatum()); - } else { - outTuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val)); - } - break; + break; + } + case TIME: + case TIMESTAMP: { + long val = buffer.getLong(); + if (val < Long.MIN_VALUE + 1) { + outTuple.put(i, DatumFactory.createNullDatum()); + } else { + outTuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val)); } - case NULL_TYPE: - outTuple.put(i, NullDatum.get()); - break; + break; + } + case NULL_TYPE: + outTuple.put(i, NullDatum.get()); + break; - default: + default: } } @@ -401,6 +399,7 @@ public class RawFile { buffer.clear(); forceFillBuffer = true; filePosition = fragment.getStartKey(); + recordCount = 0; channel.position(filePosition); eos = false; } @@ -462,331 +461,11 @@ public class RawFile { } } - public static class RawFileAppender extends FileAppender { - private FileChannel channel; - private RandomAccessFile randomAccessFile; - private DataType[] columnTypes; - - private ByteBuffer buffer; - private ByteBuf buf; - private BitArray nullFlags; - private int headerSize = 0; - private static final int RECORD_SIZE = 4; - private long pos; - - private TableStatistics stats; - - public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId, - Schema schema, TableMeta meta, Path workDir) throws IOException { - super(conf, taskAttemptId, schema, meta, workDir); - } - - public void init() throws IOException { - File file; - try { - if (path.toUri().getScheme() != null) { - file = new File(path.toUri()); - } else { - file = new File(path.toString()); - } - } catch (IllegalArgumentException iae) { - throw new IOException(iae); - } - - randomAccessFile = new RandomAccessFile(file, "rw"); - channel = randomAccessFile.getChannel(); - pos = 0; - - columnTypes = new DataType[schema.size()]; - for (int i = 0; i < schema.size(); i++) { - columnTypes[i] = schema.getColumn(i).getDataType(); - } - - buf = BufferPool.directBuffer(conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); - buffer = buf.nioBuffer(0, buf.capacity()); - - // comput the number of bytes, representing the null flags - - nullFlags = new BitArray(schema.size()); - headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); - - if (tableStatsEnabled) { - this.stats = new TableStatistics(this.schema, columnStatsEnabled); - } - - super.init(); - } - - @Override - public long getOffset() throws IOException { - return pos; - } - - private void flushBuffer() throws IOException { - buffer.flip(); - channel.write(buffer); - buffer.clear(); - } - - private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten) - throws IOException { - - // if the buffer reaches the limit, - // write the bytes from 0 to the previous record. - if (buffer.remaining() < sizeToBeWritten) { - - int limit = buffer.position(); - buffer.limit(recordOffset); - buffer.flip(); - channel.write(buffer); - buffer.position(recordOffset); - buffer.limit(limit); - buffer.compact(); - - //increase the write-buffer - if(buffer.remaining() < sizeToBeWritten) { - buf.setIndex(buffer.position(), buffer.limit()); - buf.ensureWritable(sizeToBeWritten); - buffer = buf.nioBuffer(0, buf.capacity()); - buffer.position(buf.readerIndex()); - } - return true; - } else { - return false; - } - } - - /** - * Encode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers - * into values that can be efficiently encoded with varint. (Otherwise, - * negative values must be sign-extended to 64 bits to be varint encoded, - * thus always taking 10 bytes on the wire.) - * - * @param n A signed 32-bit integer. - * @return An unsigned 32-bit integer, stored in a signed int because - * Java has no explicit unsigned support. - */ - public static int encodeZigZag32(final int n) { - // Note: the right-shift must be arithmetic - return (n << 1) ^ (n >> 31); - } - - /** - * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers - * into values that can be efficiently encoded with varint. (Otherwise, - * negative values must be sign-extended to 64 bits to be varint encoded, - * thus always taking 10 bytes on the wire.) - * - * @param n A signed 64-bit integer. - * @return An unsigned 64-bit integer, stored in a signed int because - * Java has no explicit unsigned support. - */ - public static long encodeZigZag64(final long n) { - // Note: the right-shift must be arithmetic - return (n << 1) ^ (n >> 63); - } - - /** - * Encode and write a varint. {@code value} is treated as - * unsigned, so it won't be sign-extended if negative. - */ - public void writeRawVarint32(int value) throws IOException { - while (true) { - if ((value & ~0x7F) == 0) { - buffer.put((byte) value); - return; - } else { - buffer.put((byte) ((value & 0x7F) | 0x80)); - value >>>= 7; - } - } - } - - /** - * Compute the number of bytes that would be needed to encode a varint. - * {@code value} is treated as unsigned, so it won't be sign-extended if - * negative. - */ - public static int computeRawVarint32Size(final int value) { - if ((value & (0xffffffff << 7)) == 0) return 1; - if ((value & (0xffffffff << 14)) == 0) return 2; - if ((value & (0xffffffff << 21)) == 0) return 3; - if ((value & (0xffffffff << 28)) == 0) return 4; - return 5; - } - - /** Encode and write a varint. */ - public void writeRawVarint64(long value) throws IOException { - while (true) { - if ((value & ~0x7FL) == 0) { - buffer.put((byte) value); - return; - } else { - buffer.put((byte) ((value & 0x7F) | 0x80)); - value >>>= 7; - } - } - } - - @Override - public void addTuple(Tuple t) throws IOException { - - if (buffer.remaining() < headerSize) { - flushBuffer(); - } - - // skip the row header - int recordOffset = buffer.position(); - buffer.position(recordOffset + headerSize); - // reset the null flags - nullFlags.clear(); - for (int i = 0; i < schema.size(); i++) { - // it is to calculate min/max values, and it is only used for the intermediate file. - if (tableStatsEnabled) { - stats.analyzeField(i, t); - } - - if (t.isBlankOrNull(i)) { - nullFlags.set(i); - continue; - } - - // 10 is the maximum bytes size of all types - if (flushBufferAndReplace(recordOffset, 10)) { - recordOffset = 0; - } - - switch(columnTypes[i].getType()) { - case NULL_TYPE: - nullFlags.set(i); - continue; - - case BOOLEAN: - case BIT: - buffer.put(t.getByte(i)); - break; - - case INT2 : - buffer.putShort(t.getInt2(i)); - break; - - case INT4 : - writeRawVarint32(encodeZigZag32(t.getInt4(i))); - break; - - case INT8 : - writeRawVarint64(encodeZigZag64(t.getInt8(i))); - break; - - case FLOAT4 : - buffer.putFloat(t.getFloat4(i)); - break; - - case FLOAT8 : - buffer.putDouble(t.getFloat8(i)); - break; - - case CHAR: - case TEXT: { - byte [] strBytes = t.getBytes(i); - if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) { - recordOffset = 0; - } - writeRawVarint32(strBytes.length); - buffer.put(strBytes); - break; - } + public static class RawFileAppender extends DirectRawFileWriter { - case DATE: - buffer.putInt(t.getInt4(i)); - break; - - case TIME: - case TIMESTAMP: - buffer.putLong(t.getInt8(i)); - break; - - case BLOB : { - byte [] rawBytes = t.getBytes(i); - if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { - recordOffset = 0; - } - writeRawVarint32(rawBytes.length); - buffer.put(rawBytes); - break; - } - - case PROTOBUF: { - byte [] rawBytes = t.getBytes(i); - if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { - recordOffset = 0; - } - writeRawVarint32(rawBytes.length); - buffer.put(rawBytes); - break; - } - - case INET4 : - buffer.put(t.getBytes(i)); - break; - - default: - throw new IOException("Cannot support data type: " + columnTypes[i].getType()); - } - } - - // write a record header - int bufferPos = buffer.position(); - buffer.position(recordOffset); - buffer.putInt(bufferPos - recordOffset); - byte [] flags = nullFlags.toArray(); - buffer.putShort((short) flags.length); - buffer.put(flags); - - pos += bufferPos - recordOffset; - buffer.position(bufferPos); - - if (tableStatsEnabled) { - stats.incrementRow(); - } - } - - @Override - public void flush() throws IOException { - if(buffer != null){ - flushBuffer(); - } - } - - @Override - public void close() throws IOException { - flush(); - if (tableStatsEnabled) { - stats.setNumBytes(getOffset()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); - } - - if(buf != null){ - buffer.clear(); - buffer = null; - - buf.release(); - buf = null; - } - - IOUtils.cleanup(LOG, channel, randomAccessFile); - } - - @Override - public TableStats getStats() { - if (tableStatsEnabled) { - stats.setNumBytes(pos); - return stats.getTableStat(); - } else { - return null; - } + public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, + TableMeta meta, Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir, null); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java index 0172484..550de63 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java @@ -35,7 +35,6 @@ import org.apache.tajo.tuple.RowBlockReader; import org.apache.tajo.tuple.memory.MemoryRowBlock; import org.apache.tajo.tuple.memory.RowBlock; import org.apache.tajo.tuple.memory.UnSafeTuple; -import org.apache.tajo.tuple.memory.ZeroCopyTuple; import org.apache.tajo.unit.StorageUnit; import java.io.File; @@ -45,6 +44,9 @@ import java.io.IOException; public class DirectRawFileScanner extends FileScanner implements SeekableScanner { private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class); + public static final String READ_BUFFER_SIZE = "tajo.storage.raw.io.read-buffer.bytes"; + public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB; + private SeekableInputChannel channel; private boolean eos = false; @@ -53,7 +55,7 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner private long filePosition; private long endOffset; - private ZeroCopyTuple unSafeTuple = new UnSafeTuple(); + private UnSafeTuple unSafeTuple = new UnSafeTuple(); private RowBlock tupleBuffer; private RowBlockReader reader; @@ -61,15 +63,19 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner super(conf, schema, meta, fragment); } + @Override public void init() throws IOException { initChannel(); - tupleBuffer = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), 64 * StorageUnit.KB, true); - - reader = tupleBuffer.getReader(); - - fetchNeeded = !next(tupleBuffer); + if (tupleBuffer == null) { + tupleBuffer = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), + conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)); + } else { + tupleBuffer.clear(); + } + fetchNeeded = true; + eos = false; super.init(); } @@ -121,12 +127,12 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner public void seek(long offset) throws IOException { channel.seek(offset); filePosition = channel.position(); - tupleBuffer.getMemory().clear(); + tupleBuffer.clear(); fetchNeeded = true; } public boolean next(RowBlock rowblock) throws IOException { - long reamin = reader.remainForRead(); + long reamin = reader == null ? 0 : reader.remainForRead(); boolean ret = rowblock.copyFromChannel(channel); reader = rowblock.getReader(); filePosition += rowblock.getMemory().writerPosition() - reamin; @@ -136,7 +142,7 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner private boolean fetchNeeded = true; @Override - public Tuple next() throws IOException { + public UnSafeTuple next() throws IOException { if(eos) { return null; } @@ -164,9 +170,9 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner public void reset() throws IOException { // reload initial buffer filePosition = fragment.getStartKey(); + recordCount = 0; seek(filePosition); eos = false; - reader.reset(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/7734e06e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java index 23ef059..9cbb7a0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java @@ -26,11 +26,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.FileAppender; @@ -38,6 +41,9 @@ import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TableStatistics; import org.apache.tajo.storage.Tuple; import org.apache.tajo.tuple.memory.MemoryRowBlock; +import org.apache.tajo.tuple.memory.OffHeapRowBlockUtils.TupleConverter; +import org.apache.tajo.tuple.memory.RowWriter; +import org.apache.tajo.tuple.memory.UnSafeTuple; import org.apache.tajo.unit.StorageUnit; import java.io.File; @@ -46,23 +52,36 @@ import java.io.RandomAccessFile; import java.nio.channels.FileChannel; public class DirectRawFileWriter extends FileAppender { - public static final String FILE_EXTENSION = "draw"; - private static final int BUFFER_SIZE = 64 * StorageUnit.KB; private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class); - private FileChannel channel; - private RandomAccessFile randomAccessFile; - private FSDataOutputStream fos; - private boolean isLocal; - private long pos; + public static final String WRITE_BUFFER_SIZE = "tajo.storage.raw.io.write-buffer.bytes"; + public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB; + private static final float OVERFLOW_RATIO = 1.1f; + protected FileChannel channel; - private TableStatistics stats; - private ShuffleType shuffleType; - private MemoryRowBlock memoryRowBlock; + protected RandomAccessFile randomAccessFile; + protected FSDataOutputStream fos; + protected long pos; + protected TableStatistics stats; + + protected TupleConverter tupleConverter; + protected MemoryRowBlock rowBlock; + protected boolean analyzeField; + protected boolean hasExternalBuf; + protected boolean isLocal; + + public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path path) + throws IOException { + this(conf, taskAttemptId, schema, meta, path, null); + } public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId, - final Schema schema, final TableMeta meta, final Path path) throws IOException { + final Schema schema, final TableMeta meta, final Path path, + MemoryRowBlock rowBlock) throws IOException { super(conf, taskAttemptId, schema, meta, path); + this.rowBlock = rowBlock; + this.hasExternalBuf = rowBlock != null; } @Override @@ -91,19 +110,89 @@ public class DirectRawFileWriter extends FileAppender { if (tableStatsEnabled) { this.stats = new TableStatistics(this.schema, columnStatsEnabled); - this.shuffleType = PlannerUtil.getShuffleType( + if (ShuffleType.RANGE_SHUFFLE == PlannerUtil.getShuffleType( meta.getOption(StorageConstants.SHUFFLE_TYPE, - PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE))); + PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)))) { + this.analyzeField = true; + } + } + + if (rowBlock == null) { + int bufferSize = (int) (conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE) * OVERFLOW_RATIO); + rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), bufferSize, true, meta.getDataFormat()); } - memoryRowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), BUFFER_SIZE, true); + tupleConverter = initConverter(); + pos = 0; super.init(); } + public TupleConverter initConverter() { + switch (meta.getDataFormat()) { + case BuiltinStorages.DRAW: + return getDrawConverter(); + case BuiltinStorages.RAW: + return getRawConverter(); + default: + throw new TajoInternalError(new UnsupportedException()); + } + } + + private TupleConverter getDrawConverter() { + return new TupleConverter() { + + @Override + public void convert(Tuple tuple, RowWriter writer) { + if (analyzeField) { + if (tuple instanceof UnSafeTuple) { + + for (int i = 0; i < writer.dataTypes().length; i++) { + // it is to calculate min/max values, and it is only used for the intermediate file. + stats.analyzeField(i, tuple); + } + // write direct to memory + writer.addTuple(tuple); + } else { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + // it is to calculate min/max values, and it is only used for the intermediate file. + stats.analyzeField(i, tuple); + writeField(i, tuple, writer); + } + writer.endRow(); + } + } else { + // write direct to memory + writer.addTuple(tuple); + } + } + }; + } + + private TupleConverter getRawConverter() { + return new TupleConverter() { + + @Override + public void convert(Tuple tuple, RowWriter writer) { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + // it is to calculate min/max values, and it is only used for the intermediate file. + if (analyzeField) { + stats.analyzeField(i, tuple); + } + writeField(i, tuple, writer); + } + writer.endRow(); + } + }; + } + @Override public long getOffset() throws IOException { - return pos + memoryRowBlock.getMemory().writerPosition(); + return hasExternalBuf ? pos : pos + rowBlock.getMemory().writerPosition(); } public void writeRowBlock(MemoryRowBlock rowBlock) throws IOException { @@ -113,33 +202,27 @@ public class DirectRawFileWriter extends FileAppender { pos += rowBlock.getMemory().writeTo(fos); } - rowBlock.getMemory().clear(); - if (tableStatsEnabled) { - stats.incrementRows(rowBlock.rows() - stats.getNumRows()); + stats.incrementRows(rowBlock.rows()); } } @Override public void addTuple(Tuple t) throws IOException { - if (shuffleType == ShuffleType.RANGE_SHUFFLE) { - // it is to calculate min/max values, and it is only used for the intermediate file. - for (int i = 0; i < schema.size(); i++) { - stats.analyzeField(i, t); - } - } - memoryRowBlock.getWriter().addTuple(t); + tupleConverter.convert(t, rowBlock.getWriter()); - if(memoryRowBlock.getMemory().readableBytes() >= BUFFER_SIZE) { - writeRowBlock(memoryRowBlock); + if(rowBlock.usedMem() > DEFAULT_BUFFER_SIZE) { + writeRowBlock(rowBlock); + rowBlock.clear(); } } @Override public void flush() throws IOException { - if(memoryRowBlock.getMemory().isReadable()) { - writeRowBlock(memoryRowBlock); + if(!hasExternalBuf && rowBlock.getMemory().isReadable()) { + writeRowBlock(rowBlock); + rowBlock.clear(); } } @@ -155,7 +238,9 @@ public class DirectRawFileWriter extends FileAppender { } IOUtils.cleanup(LOG, channel, randomAccessFile, fos); - memoryRowBlock.release(); + if(!hasExternalBuf && rowBlock != null) { + rowBlock.release(); + } } @Override
