Repository: tajo Updated Branches: refs/heads/branch-0.11.1 c91bfdabc -> a28db5a70
TAJO-1983: Improve memory usage of ExternalSortExec. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a28db5a7 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a28db5a7 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a28db5a7 Branch: refs/heads/branch-0.11.1 Commit: a28db5a70e3d694ad6f81da87ed6336aaa6ea6a0 Parents: c91bfda Author: Jinho Kim <[email protected]> Authored: Thu Nov 26 12:04:08 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Thu Nov 26 12:04:08 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/tuple/BaseTupleBuilder.java | 23 +- .../tuple/memory/CompactRowBlockWriter.java | 30 +- .../tajo/tuple/memory/OffHeapRowBlockUtils.java | 74 ++- .../tuple/memory/OffHeapRowBlockWriter.java | 25 +- .../tajo/tuple/memory/OffHeapRowWriter.java | 16 + .../tajo/tuple/memory/ResizableMemoryBlock.java | 10 +- .../org/apache/tajo/tuple/memory/RowWriter.java | 9 +- .../tajo/tuple/memory/UnSafeTupleList.java | 105 ++++ .../tajo/tuple/memory/TestMemoryRowBlock.java | 40 +- .../physical/TestProgressExternalSortExec.java | 8 +- .../planner/physical/TestUnSafeTuple.java | 104 ++++ .../tajo/querymaster/TestTaskStatusUpdate.java | 4 +- .../planner/physical/ExternalSortExec.java | 482 +++++++++++++------ .../engine/planner/physical/PhysicalExec.java | 4 + .../tajo/worker/ExecutionBlockContext.java | 5 + .../java/org/apache/tajo/worker/Fetcher.java | 17 +- .../java/org/apache/tajo/worker/TaskImpl.java | 29 +- .../tajo/pullserver/TajoPullServerService.java | 24 +- .../tajo/storage/BaseTupleComparator.java | 45 +- .../org/apache/tajo/storage/NullScanner.java | 1 - 21 files changed, 831 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b2ae4b7..1fc43e8 100644 --- a/CHANGES +++ b/CHANGES @@ -7,6 +7,8 @@ Release 0.11.1 - unreleased IMPROVEMENT + TAJO-1983: Improve memory usage of ExternalSortExec. (jinho) + TAJO-1271: Improve memory usage of Hash-shuffle. (jinho) TAJO-1966: Decrease memory usage of TajoTestingCluster. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java index ebdcc26..d962218 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -19,6 +19,7 @@ package org.apache.tajo.tuple; import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.exception.ValueOutOfRangeException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.tuple.memory.*; import org.apache.tajo.unit.StorageUnit; @@ -54,6 +55,11 @@ public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, } @Override + public void backward(int length) { + memoryBlock.writerPosition(memoryBlock.writerPosition() - length); + } + + @Override public boolean startRow() { memoryBlock.writerPosition(0); return super.startRow(); @@ -65,13 +71,18 @@ public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, } @Override - public void addTuple(Tuple tuple) { - if (tuple instanceof UnSafeTuple) { - UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); - addTuple(unSafeTuple); - } else { - OffHeapRowBlockUtils.convert(tuple, this); + public boolean addTuple(Tuple tuple) { + try { + if (tuple instanceof UnSafeTuple) { + UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); + addTuple(unSafeTuple); + } else { + OffHeapRowBlockUtils.convert(tuple, this); + } + } catch (ValueOutOfRangeException e) { + return false; } + return true; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java index a88d2f1..67d5f8c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/CompactRowBlockWriter.java @@ -25,6 +25,7 @@ import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.ValueOutOfRangeException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.BitArray; import org.apache.tajo.util.SizeOf; @@ -182,6 +183,15 @@ public class CompactRowBlockWriter implements RowWriter { rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() + length); } + /** + * Backward the address; + * + * @param length Length to be backwarded + */ + public void backward(int length) { + rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() - length); + } + public void ensureSize(int size) { rowBlock.getMemory().ensureSize(size); } @@ -232,7 +242,7 @@ public class CompactRowBlockWriter implements RowWriter { return true; } - + @Override public void endRow() { long rowHeaderPos = recordStartAddr(); // curOffset is equivalent to a byte length of this row. @@ -249,6 +259,15 @@ public class CompactRowBlockWriter implements RowWriter { } @Override + public void cancelRow() { + // curOffset is equivalent to a byte length of current row. + backward(curOffset); + curOffset = 0; + nullFlags.clear(); + curFieldIdx = 0; + } + + @Override public void skipField() { // set null flag nullFlags.set(curFieldIdx); @@ -402,7 +421,12 @@ public class CompactRowBlockWriter implements RowWriter { } @Override - public void addTuple(Tuple tuple) { - OffHeapRowBlockUtils.convert(tuple, this); + public boolean addTuple(Tuple tuple) { + try { + OffHeapRowBlockUtils.convert(tuple, this); + } catch (ValueOutOfRangeException e) { + return false; + } + return true; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java index 1aca22f..3f27763 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockUtils.java @@ -19,10 +19,13 @@ package org.apache.tajo.tuple.memory; import com.google.common.collect.Lists; +import com.google.common.primitives.*; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.exception.ValueOutOfRangeException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.tuple.RowBlockReader; @@ -62,6 +65,11 @@ public class OffHeapRowBlockUtils { return tupleList; } + public static List<UnSafeTuple> sort(UnSafeTupleList list, Comparator<UnSafeTuple> comparator) { + Collections.sort(list, comparator); + return list; + } + public static Tuple[] sortToArray(MemoryRowBlock rowBlock, Comparator<Tuple> comparator) { Tuple[] tuples = new Tuple[rowBlock.rows()]; @@ -85,18 +93,74 @@ public class OffHeapRowBlockUtils { return tuples; } + public static final int compareColumn(UnSafeTuple tuple1, UnSafeTuple tuple2, int index, TajoDataTypes.Type type, + boolean ascending, boolean nullFirst) { + final boolean n1 = tuple1.isBlankOrNull(index); + final boolean n2 = tuple2.isBlankOrNull(index); + if (n1 && n2) { + return 0; + } + + if (n1 ^ n2) { + return nullFirst ? (n1 ? -1 : 1) : (n1 ? 1 : -1); + } + + int compare; + switch (type) { + case BOOLEAN: + compare = Booleans.compare(tuple1.getBool(index), tuple2.getBool(index)); + break; + case BIT: + compare = tuple1.getByte(index) - tuple2.getByte(index); + break; + case INT1: + case INT2: + compare = Shorts.compare(tuple1.getInt2(index), tuple2.getInt2(index)); + break; + case DATE: + case INT4: + compare = Ints.compare(tuple1.getInt4(index), tuple2.getInt4(index)); + break; + case INET4: + compare = UnsignedInts.compare(tuple1.getInt4(index), tuple2.getInt4(index)); + break; + case TIME: + case TIMESTAMP: + case INT8: + compare = Longs.compare(tuple1.getInt8(index), tuple2.getInt8(index)); + break; + case FLOAT4: + compare = Floats.compare(tuple1.getFloat4(index), tuple2.getFloat4(index)); + break; + case FLOAT8: + compare = Doubles.compare(tuple1.getFloat8(index), tuple2.getFloat8(index)); + break; + case CHAR: + case TEXT: + case BLOB: + compare = UnSafeTupleBytesComparator.compare(tuple1.getFieldAddr(index), tuple2.getFieldAddr(index)); + break; + default: + throw new TajoRuntimeException( + new UnsupportedException("unknown data type '" + type.name() + "'")); + } + return ascending ? compare : -compare; + } /** * This class is tuple converter to the RowBlock */ public static class TupleConverter { public void convert(Tuple tuple, RowWriter writer) { - writer.startRow(); - - for (int i = 0; i < writer.dataTypes().length; i++) { - writeField(i, tuple, writer); + try { + writer.startRow(); + for (int i = 0; i < writer.dataTypes().length; i++) { + writeField(i, tuple, writer); + } + } catch (ValueOutOfRangeException e) { + writer.cancelRow(); + throw e; } - writer.endRow(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java index 9f3d8a2..594f927 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java @@ -20,6 +20,7 @@ package org.apache.tajo.tuple.memory; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.ValueOutOfRangeException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.TUtil; @@ -48,6 +49,11 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter { rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() + length); } + @Override + public void backward(int length) { + rowBlock.getMemory().writerPosition(rowBlock.getMemory().writerPosition() - length); + } + public void ensureSize(int size) { rowBlock.getMemory().ensureSize(size); } @@ -65,13 +71,18 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter { @Override - public void addTuple(Tuple tuple) { - if (tuple instanceof UnSafeTuple) { - UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); - addTuple(unSafeTuple); - rowBlock.setRows(rowBlock.rows() + 1); - } else { - OffHeapRowBlockUtils.convert(tuple, this); + public boolean addTuple(Tuple tuple) { + try { + if (tuple instanceof UnSafeTuple) { + UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); + addTuple(unSafeTuple); + rowBlock.setRows(rowBlock.rows() + 1); + } else { + OffHeapRowBlockUtils.convert(tuple, this); + } + } catch (ValueOutOfRangeException e) { + return false; } + return true; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java index f082762..e992aed 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java @@ -94,6 +94,12 @@ public abstract class OffHeapRowWriter implements RowWriter { */ public abstract void forward(int length); + /** + * Backward the address; + * + * @param length Length to be backwarded + */ + public abstract void backward(int length); @Override public void clear() { @@ -130,6 +136,16 @@ public abstract class OffHeapRowWriter implements RowWriter { PlatformDependent.putInt(rowHeaderPos, MemoryRowBlock.NULL_FIELD_OFFSET); rowHeaderPos += SizeOf.SIZE_OF_INT; } + curOffset = 0; + } + + @Override + public void cancelRow() { + // curOffset is equivalent to a byte length of current row. + backward(curOffset); + curOffset = 0; + curFieldOffset = SizeOf.SIZE_OF_INT; + curFieldIdx = 0; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java index 09faff9..6b06486 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ResizableMemoryBlock.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.exception.ValueOutOfRangeException; import org.apache.tajo.storage.BufferPool; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.UnsafeUtil; @@ -128,11 +129,12 @@ public class ResizableMemoryBlock implements MemoryBlock { @Override public void ensureSize(int size) { if (!buffer.isWritable(size)) { - if (!limitSpec.canIncrease(size)) { - throw new RuntimeException("Cannot increase RowBlock anymore."); + int newBlockSize = limitSpec.increasedSize(size); + + if (!limitSpec.canIncrease(buffer.writableBytes() + newBlockSize)) { + throw new ValueOutOfRangeException("Cannot increase RowBlock anymore."); } - int newBlockSize = limitSpec.increasedSize(size); resize(newBlockSize); LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); } @@ -142,7 +144,7 @@ public class ResizableMemoryBlock implements MemoryBlock { Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); if (newSize > limitSpec.limit()) { - throw new RuntimeException("Resize cannot exceed the capacity limit"); + throw new ValueOutOfRangeException("Resize cannot exceed the capacity limit"); } if (newSize < buffer.writableBytes()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java index 0393714..e055ab7 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/RowWriter.java @@ -30,6 +30,11 @@ import org.apache.tajo.storage.Tuple; * startRow() --> skipField() or putXXX --> endRow() * </pre> * + * If you want to cancel the current row, should be as follows: + * <pre> + * startRow() --> skipField() or putXXX --> cancelRow() + * </pre> + * * The total number of skipField and putXXX invocations must be equivalent to the number of fields. */ public interface RowWriter { @@ -40,6 +45,8 @@ public interface RowWriter { void endRow(); + void cancelRow(); + void skipField(); void clear(); @@ -76,5 +83,5 @@ public interface RowWriter { void putProtoDatum(ProtobufDatum datum); - void addTuple(Tuple tuple); + boolean addTuple(Tuple tuple); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java new file mode 100644 index 0000000..4c4a6cb --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleList.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.memory; + +import com.google.common.collect.Lists; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.unit.StorageUnit; + +import java.util.ArrayList; +import java.util.List; + +/** + * In UnSafeTupleList, input tuples are copied to off-heap memory page whenever the add() method is called. + * The memory pages are automatically added, if memory of a page are exceeded. + * This instance must be released + */ +public class UnSafeTupleList extends ArrayList<UnSafeTuple> { + + private final DataType[] dataTypes; + private List<MemoryRowBlock> rowBlocks; + private MemoryRowBlock currentRowBlock; + private int totalUsedMem; + private int pageSize; + + public UnSafeTupleList(DataType[] dataTypes, int initialArraySize) { + this(dataTypes, initialArraySize, StorageUnit.MB); + + } + + public UnSafeTupleList(DataType[] dataTypes, int initialArraySize, int pageSize) { + super(initialArraySize); + this.dataTypes = dataTypes; + this.pageSize = pageSize; + this.rowBlocks = Lists.newArrayList(); + this.currentRowBlock = new MemoryRowBlock(dataTypes, new FixedSizeLimitSpec(pageSize), true); + this.rowBlocks.add(currentRowBlock); + + } + + @Override + public boolean add(UnSafeTuple tuple) { + return addTuple(tuple); + } + + public boolean addTuple(Tuple tuple) { + + int prevPos = currentRowBlock.getMemory().writerPosition(); + if (currentRowBlock.getWriter().addTuple(tuple)) { + UnSafeTuple unSafeTuple = new UnSafeTuple(); + unSafeTuple.set(currentRowBlock.getMemory(), prevPos, dataTypes); + return super.add(unSafeTuple); + } else { + this.totalUsedMem += currentRowBlock.usedMem(); + this.currentRowBlock = new MemoryRowBlock(dataTypes, new FixedSizeLimitSpec(pageSize), true); + this.rowBlocks.add(currentRowBlock); + return this.addTuple(tuple); + } + } + + /** + * Release the cached pages + */ + public void release() { + for (MemoryRowBlock rowBlock : rowBlocks) { + rowBlock.release(); + } + super.clear(); + rowBlocks.clear(); + totalUsedMem = 0; + } + + /** + * Total used memory + */ + public int usedMem() { + return totalUsedMem + currentRowBlock.usedMem(); + } + + /** + * Release and reset + */ + @Override + public void clear() { + release(); + this.currentRowBlock = new MemoryRowBlock(dataTypes, new FixedSizeLimitSpec(pageSize), true); + this.rowBlocks.add(currentRowBlock); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java index a6003c7..4e77d68 100644 --- a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java +++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.ValueOutOfRangeException; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; @@ -39,9 +40,7 @@ import java.util.Comparator; import java.util.List; import static org.apache.tajo.common.TajoDataTypes.Type; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestMemoryRowBlock { private static final Log LOG = LogFactory.getLog(TestMemoryRowBlock.class); @@ -115,6 +114,41 @@ public class TestMemoryRowBlock { } @Test + public void testPutAndCancelValidation() { + VTuple vTuple = new VTuple(schema.length); + fillVTuple(0, vTuple); + + //get memory size of 1 row + MemoryRowBlock rowBlock = new MemoryRowBlock(schema); + fillRow(0, rowBlock.getWriter()); + int rowSize = rowBlock.usedMem(); + rowBlock.release(); + + rowBlock = new MemoryRowBlock(schema, new FixedSizeLimitSpec(rowSize / 2, 0.0f), true); + assertFalse(rowBlock.getWriter().addTuple(vTuple)); + try { + OffHeapRowBlockUtils.convert(vTuple, rowBlock.getWriter()); + fail(); + } catch (Exception e) { + assertEquals(ValueOutOfRangeException.class, e.getClass()); + } + rowBlock.release(); + + //allow 1 row + rowBlock = new MemoryRowBlock(schema, new FixedSizeLimitSpec(rowSize, 0.0f), true); + assertTrue(rowBlock.getWriter().addTuple(vTuple)); + assertFalse(rowBlock.getWriter().addTuple(vTuple)); + assertEquals(1, rowBlock.rows()); + + ZeroCopyTuple tuple = new UnSafeTuple(); + RowBlockReader reader = rowBlock.getReader(); + assertTrue(reader.next(tuple)); + validateTupleResult(0, tuple); + assertFalse(reader.next(tuple)); + rowBlock.release(); + } + + @Test public void testNullityValidation() { int rowNum = 1000; http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java index b1f53da..15250a3 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java @@ -110,6 +110,7 @@ public class TestProgressExternalSortExec { catalog.createTable(employee); analyzer = new SQLAnalyzer(); planner = new LogicalPlanner(catalog, TablespaceManager.getInstance()); + employeePath.getFileSystem(conf).deleteOnExit(employeePath); } @After @@ -125,9 +126,8 @@ public class TestProgressExternalSortExec { @Test public void testExternalSortExecProgressWithMemTableScanner() throws Exception { QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf); - int bufferSize = (int) (testDataStats.getNumBytes() * 20) / StorageUnit.MB; //multiply 2 for memory fit + int bufferSize = (int) (testDataStats.getNumBytes() * 2) / StorageUnit.MB; //multiply 2 for memory fit queryContext.setInt(SessionVars.EXTSORT_BUFFER_SIZE, bufferSize); - testProgress(queryContext); } @@ -197,7 +197,7 @@ public class TestProgressExternalSortExec { assertEquals(testDataStats.getNumBytes().longValue(), tableStats.getNumBytes().longValue()); assertEquals(testDataStats.getNumRows().longValue(), cnt); assertEquals(testDataStats.getNumRows().longValue(), tableStats.getNumRows().longValue()); - assertTrue(testDataStats.getNumBytes().longValue() <= tableStats.getReadBytes().longValue()); + assertTrue(testDataStats.getNumBytes() <= tableStats.getReadBytes()); // for rescan test preVal = null; @@ -223,7 +223,7 @@ public class TestProgressExternalSortExec { assertEquals(testDataStats.getNumRows().longValue(), cnt); assertEquals(testDataStats.getNumRows().longValue(), tableStats.getNumRows().longValue()); //'ReadBytes' is actual read bytes - assertTrue(testDataStats.getNumBytes().longValue() <= tableStats.getReadBytes().longValue()); + assertTrue(testDataStats.getNumBytes() <= tableStats.getReadBytes()); conf.setIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT, ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.defaultIntVal); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java new file mode 100644 index 0000000..48170f6 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestUnSafeTuple.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.UnSafeTupleList; +import org.apache.tajo.unit.StorageUnit; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class TestUnSafeTuple { + + private static final Random rnd = new Random(-1); + private static Schema schema; + + @BeforeClass + public static void setupClass() { + Column col0 = new Column("col0", Type.BOOLEAN); + Column col1 = new Column("col1", Type.INT4); + Column col2 = new Column("col2", Type.INT8); + Column col3 = new Column("col3", Type.FLOAT4); + Column col4 = new Column("col4", Type.FLOAT8); + + schema = new Schema(new Column[]{col0, col1, col2, col3, col4}); + } + + @Test + public final void testMemoryPageAndValidation() { + + Datum[] datums = new Datum[]{ + DatumFactory.createBool(rnd.nextBoolean()), + DatumFactory.createInt4(rnd.nextInt()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createFloat4(rnd.nextFloat()), + DatumFactory.createFloat8(rnd.nextDouble())}; + Tuple tuple = new VTuple(datums); + + int pageSize = StorageUnit.KB; + UnSafeTupleList unSafeTupleList = new UnSafeTupleList(SchemaUtil.toDataTypes(schema), 100, StorageUnit.KB); + assertEquals(0, unSafeTupleList.usedMem()); + assertEquals(0, unSafeTupleList.size()); + + unSafeTupleList.addTuple(tuple); + //get the memory bytes of tuple + int tupleSize = unSafeTupleList.usedMem(); + assertEquals(1, unSafeTupleList.size()); + assertEquals(tuple, unSafeTupleList.get(0)); + + unSafeTupleList.clear(); + assertEquals(0, unSafeTupleList.usedMem()); + assertEquals(0, unSafeTupleList.size()); + + //test only 1 page + int testCount = pageSize / tupleSize; + Tuple[] tuples = new Tuple[testCount]; + + for (int i = 0; i < testCount; i++) { + datums = new Datum[]{ + DatumFactory.createBool(rnd.nextBoolean()), + DatumFactory.createInt4(rnd.nextInt()), + DatumFactory.createInt8(rnd.nextLong()), + DatumFactory.createFloat4(rnd.nextFloat()), + DatumFactory.createFloat8(rnd.nextDouble())}; + tuples[i] = new VTuple(datums); + unSafeTupleList.addTuple(tuples[i]); + } + + assertEquals(testCount, unSafeTupleList.size()); + assertEquals(tupleSize * testCount, unSafeTupleList.usedMem()); + + for (int i = 0; i < testCount; i++) { + assertEquals(tuples[i], unSafeTupleList.get(i)); + } + + unSafeTupleList.release(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java index 9f5ecb5..425a7d6 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java @@ -79,7 +79,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { // tpch/lineitem.tbl long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2}; long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 236}; - long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0}; + long[] expectedReadBytes = new long[]{604, 604, 236, 0, 138, 0}; QueryId queryId = getQueryId(res); assertStatus(queryId, 3, expectedNumRows, expectedNumBytes, expectedReadBytes); @@ -108,7 +108,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { // in/out * stage(4) long[] expectedNumRows = new long[]{5, 5, 2, 2, 2, 2, 2, 2}; long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 64}; - long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0}; + long[] expectedReadBytes = new long[]{20, 20, 8, 8, 64, 0, 34, 0}; QueryId queryId = getQueryId(res); assertStatus(queryId, 4, expectedNumRows, expectedNumBytes, expectedReadBytes); http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 8abafec..69631b5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -18,6 +18,9 @@ package org.apache.tajo.engine.planner.physical; +import com.google.common.base.Preconditions; +import com.google.common.primitives.*; +import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.LocalDirAllocator; @@ -26,18 +29,24 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.BuiltinStorages; import org.apache.tajo.SessionVars; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.TextDatum; import org.apache.tajo.engine.planner.PhysicalPlanningException; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.storage.rawfile.DirectRawFileWriter; +import org.apache.tajo.tuple.memory.OffHeapRowBlockUtils; +import org.apache.tajo.tuple.memory.UnSafeTuple; +import org.apache.tajo.tuple.memory.UnSafeTupleList; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -48,9 +57,10 @@ import java.io.IOException; import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.concurrent.*; - -import static org.apache.tajo.storage.RawFile.RawFileAppender; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * This external sort algorithm can be characterized by the followings: @@ -75,15 +85,19 @@ public class ExternalSortExec extends SortExec { /** the defaultFanout of external sort */ private final int defaultFanout; /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */ - private long sortBufferBytesNum; + private int sortBufferBytesNum; /** the number of available cores */ private final int allocatedCoreNum; /** If there are available multiple cores, it tries parallel merge. */ private ExecutorService executorService; /** used for in-memory sort of each chunk. */ - private TupleList inMemoryTable; + private UnSafeTupleList inMemoryTable; + /** for zero copy tuple comparison */ + private Comparator<UnSafeTuple> unSafeComparator; + /** for other type tuple comparison */ + private Comparator<Tuple> primitiveComparator; /** temporal dir */ - private final Path sortTmpDir; + private Path sortTmpDir; /** It enables round-robin disks allocation */ private final LocalDirAllocator localDirAllocator; /** local file system */ @@ -98,12 +112,10 @@ public class ExternalSortExec extends SortExec { /////////////////////////////////////////////////// /** already sorted or not */ private boolean sorted = false; - /** a flag to point whether sorted data resides in memory or not */ - private boolean memoryResident = true; /** the final result */ private Scanner result; /** total bytes of input data */ - private long sortAndStoredBytes; + private long inputBytes; private ExternalSortExec(final TaskAttemptContext context, final SortNode plan) throws PhysicalPlanningException { @@ -115,15 +127,12 @@ public class ExternalSortExec extends SortExec { throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2"); } // TODO - sort buffer and core num should be changed to use the allocated container resource. - this.sortBufferBytesNum = context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB; + this.sortBufferBytesNum = context.getQueryContext().getInt(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB; this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM); - this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum); - this.inMemoryTable = new TupleList(context.getQueryContext().getInt(SessionVars.SORT_LIST_SIZE)); - - this.sortTmpDir = getExecutorTmpDir(); this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); this.localFS = new RawLocalFileSystem(); - this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.RAW); //TODO change to SHUFFLE_FILE_FORMAT + this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW); + this.inputStats = new TableStats(); } public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, final ScanNode scanNode, @@ -133,7 +142,7 @@ public class ExternalSortExec extends SortExec { mergedInputFragments = TUtil.newList(); for (CatalogProtos.FragmentProto proto : fragments) { FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto); - mergedInputFragments.add(new Chunk(fragment, scanNode.getTableDesc().getMeta())); + mergedInputFragments.add(new Chunk(inSchema, fragment, scanNode.getTableDesc().getMeta())); } } @@ -143,8 +152,19 @@ public class ExternalSortExec extends SortExec { setChild(child); } + @Override public void init() throws IOException { - inputStats = new TableStats(); + if(allocatedCoreNum > 1) { + this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum); + } + + this.sortTmpDir = getExecutorTmpDir(); + + int initialArraySize = context.getQueryContext().getInt(SessionVars.SORT_LIST_SIZE); + this.inMemoryTable = new UnSafeTupleList(SchemaUtil.toDataTypes(inSchema), initialArraySize); + this.unSafeComparator = new UnSafeComparator(inSchema, sortSpecs); + this.primitiveComparator = new PrimitiveComparator(inSchema, sortSpecs); + super.init(); } @@ -155,29 +175,25 @@ public class ExternalSortExec extends SortExec { /** * Sort a tuple block and store them into a chunk file */ - private Chunk sortAndStoreChunk(int chunkId, TupleList tupleBlock) + private Chunk sortAndStoreChunk(int chunkId, UnSafeTupleList tupleBlock) throws IOException { - int rowNum = tupleBlock.size(); long sortStart = System.currentTimeMillis(); - Iterable<Tuple> sorted = getSorter(tupleBlock).sort(); + OffHeapRowBlockUtils.sort(tupleBlock, unSafeComparator); long sortEnd = System.currentTimeMillis(); long chunkWriteStart = System.currentTimeMillis(); Path outputPath = getChunkPathForWrite(0, chunkId); - final RawFileAppender appender = - new RawFileAppender(context.getConf(), null, inSchema, intermediateMeta, outputPath); - + final DirectRawFileWriter appender = + new DirectRawFileWriter(context.getConf(), null, inSchema, intermediateMeta, outputPath); appender.init(); - for (Tuple t : sorted) { + for (Tuple t : tupleBlock) { appender.addTuple(t); } appender.close(); - tupleBlock.clear(); long chunkWriteEnd = System.currentTimeMillis(); - info(LOG, "Chunk #" + chunkId + " sort and written (" + FileUtil.humanReadableByteCount(appender.getOffset(), false) + " bytes, " + rowNum + " rows, " + "sort time: " + (sortEnd - sortStart) + " msec, " + @@ -185,7 +201,7 @@ public class ExternalSortExec extends SortExec { FileFragment frag = new FileFragment("", outputPath, 0, new File(localFS.makeQualified(outputPath).toUri()).length()); - return new Chunk(frag, intermediateMeta); + return new Chunk(inSchema, frag, intermediateMeta); } /** @@ -196,26 +212,23 @@ public class ExternalSortExec extends SortExec { */ private List<Chunk> sortAndStoreAllChunks() throws IOException { Tuple tuple; - long memoryConsumption = 0; List<Chunk> chunkPaths = TUtil.newList(); int chunkId = 0; long runStartTime = System.currentTimeMillis(); + while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start - inMemoryTable.add(tuple); - memoryConsumption += MemoryUtil.calculateMemorySize(tuple); + inMemoryTable.addTuple(tuple); - if (memoryConsumption > sortBufferBytesNum) { + if (inMemoryTable.usedMem() > sortBufferBytesNum) { // if input data exceeds main-memory at least once long runEndTime = System.currentTimeMillis(); - info(LOG, chunkId + " run loading time: " + (runEndTime - runStartTime) + " msec"); + info(LOG, "Chunk #" + chunkId + " run loading time: " + (runEndTime - runStartTime) + " msec"); runStartTime = runEndTime; - info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " bytes"); - memoryResident = false; + info(LOG, "Memory consumption exceeds " + FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false)); chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable)); - - memoryConsumption = 0; + inMemoryTable.clear(); chunkId++; // When the volume of sorting data once exceed the size of sort buffer, @@ -233,19 +246,15 @@ public class ExternalSortExec extends SortExec { } } - if (!memoryResident && !inMemoryTable.isEmpty()) { // if there are at least one or more input tuples - // check if data exceeds a sort buffer. If so, it store the remain data into a chunk. - long start = System.currentTimeMillis(); - int rowNum = inMemoryTable.size(); - chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable)); - long end = System.currentTimeMillis(); - info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)"); + if(inMemoryTable.size() > 0) { //if there are at least one or more input tuples + //store the remain data into a memory chunk. + chunkPaths.add(new Chunk(inSchema, inMemoryTable, intermediateMeta)); } // get total loaded (or stored) bytes and total row numbers TableStats childTableStats = child.getInputStats(); if (childTableStats != null) { - sortAndStoredBytes = childTableStats.getNumBytes(); + inputBytes = childTableStats.getNumBytes(); } return chunkPaths; } @@ -267,6 +276,7 @@ public class ExternalSortExec extends SortExec { if (mergedInputFragments != null) { try { this.result = externalMergeAndSort(mergedInputFragments); + this.inputBytes = result.getInputStats().getNumBytes(); } catch (Exception e) { throw new PhysicalPlanningException(e); } @@ -275,19 +285,16 @@ public class ExternalSortExec extends SortExec { long startTimeOfChunkSplit = System.currentTimeMillis(); List<Chunk> chunks = sortAndStoreAllChunks(); long endTimeOfChunkSplit = System.currentTimeMillis(); - info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec"); - - if (memoryResident) { // if all sorted data reside in a main-memory table. - TupleSorter sorter = getSorter(inMemoryTable); - result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), sortAndStoredBytes); - } else { // if input data exceeds main-memory at least once + info(LOG, chunks.size() + " Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec"); + if(chunks.size() == 0) { + this.result = new NullScanner(context.getConf(), inSchema, intermediateMeta, null); + } else { try { this.result = externalMergeAndSort(chunks); } catch (Exception e) { throw new PhysicalPlanningException(e); } - } } @@ -324,8 +331,7 @@ public class ExternalSortExec extends SortExec { return computedFanout; } - private Scanner externalMergeAndSort(List<Chunk> chunks) - throws IOException, ExecutionException, InterruptedException { + private Scanner externalMergeAndSort(List<Chunk> chunks) throws Exception { int level = 0; final List<Chunk> inputFiles = TUtil.newList(chunks); final List<Chunk> outputFiles = TUtil.newList(); @@ -352,8 +358,14 @@ public class ExternalSortExec extends SortExec { // how many files are merged in ith thread? numberOfMergingFiles.add(fanout); // launch a merger runner - futures.add(executorService.submit( - new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false))); + if(allocatedCoreNum > 1) { + futures.add(executorService.submit( + new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false))); + } else { + final SettableFuture<Chunk> future = SettableFuture.create(); + future.set(new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout, false).call()); + futures.add(future); + } outputFileNum++; startIdx += fanout; @@ -406,14 +418,23 @@ public class ExternalSortExec extends SortExec { */ int numDeletedFiles = 0; for (Chunk chunk : inputFiles) { - if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { + if (chunk.isMemory()) { + if (LOG.isDebugEnabled()) { + debug(LOG, "Remove intermediate memory tuples: " + chunk.getMemoryTuples().usedMem()); + } + chunk.getMemoryTuples().release(); + } else if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { localFS.delete(chunk.getFragment().getPath(), true); numDeletedFiles++; - if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: " + chunk.getFragment()); + if (LOG.isDebugEnabled()) { + debug(LOG, "Delete merged intermediate file: " + chunk.getFragment()); + } } } - info(LOG, numDeletedFiles + " merged intermediate files deleted"); + if(LOG.isDebugEnabled()) { + debug(LOG, numDeletedFiles + " merged intermediate files deleted"); + } // switch input files to output files, and then clear outputFiles inputFiles.clear(); @@ -459,11 +480,14 @@ public class ExternalSortExec extends SortExec { final Path outputPath = getChunkPathForWrite(level + 1, nextRunId); info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName()); long mergeStartTime = System.currentTimeMillis(); - final RawFileAppender output = - new RawFileAppender(context.getConf(), null, inSchema, intermediateMeta, outputPath); - output.init(); + final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout); merger.init(); + + final DirectRawFileWriter output = + new DirectRawFileWriter(context.getConf(), null, inSchema, intermediateMeta, outputPath); + output.init(); + Tuple mergeTuple; while((mergeTuple = merger.next()) != null) { output.addTuple(mergeTuple); @@ -476,7 +500,7 @@ public class ExternalSortExec extends SortExec { + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)"); File f = new File(localFS.makeQualified(outputPath).toUri()); FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + outputPath.getName(), outputPath, 0, f.length()); - return new Chunk(frag, intermediateMeta); + return new Chunk(inSchema, frag, intermediateMeta); } } @@ -492,23 +516,35 @@ public class ExternalSortExec extends SortExec { */ private Scanner createFinalMerger(List<Chunk> inputs) throws IOException { if (inputs.size() == 1) { - this.result = getFileScanner(inputs.get(0)); + this.result = getScanner(inputs.get(0)); } else { this.result = createKWayMerger(inputs, 0, inputs.size()); } return result; } - private Scanner getFileScanner(Chunk chunk) throws IOException { - return TablespaceManager.getLocalFs().getScanner(chunk.getMeta(), inSchema, chunk.getFragment(), outSchema); + private Scanner getScanner(Chunk chunk) throws IOException { + if (chunk.isMemory()) { + long sortStart = System.currentTimeMillis(); + + OffHeapRowBlockUtils.sort(inMemoryTable, unSafeComparator); + Scanner scanner = new MemTableScanner<>(inMemoryTable, inMemoryTable.size(), inMemoryTable.usedMem()); + if(LOG.isDebugEnabled()) { + debug(LOG, "Memory Chunk sort (" + FileUtil.humanReadableByteCount(inMemoryTable.usedMem(), false) + + " bytes, " + inMemoryTable.size() + " rows, sort time: " + + (System.currentTimeMillis() - sortStart) + " msec)"); + } + return scanner; + } else { + return TablespaceManager.getLocalFs().getScanner(chunk.meta, chunk.schema, chunk.fragment, chunk.schema); + } } private Scanner createKWayMerger(List<Chunk> inputs, final int startChunkId, final int num) throws IOException { final Scanner [] sources = new Scanner[num]; for (int i = 0; i < num; i++) { - sources[i] = getFileScanner(inputs.get(startChunkId + i)); + sources[i] = getScanner(inputs.get(startChunkId + i)); } - return createKWayMergerInternal(sources, 0, num); } @@ -518,27 +554,24 @@ public class ExternalSortExec extends SortExec { final int mid = (int) Math.ceil((float)num / 2); Scanner left = createKWayMergerInternal(sources, startIdx, mid); Scanner right = createKWayMergerInternal(sources, startIdx + mid, num - mid); - if (ComparableVector.isVectorizable(sortSpecs)) { - return new VectorComparePairWiseMerger(inSchema, left, right, comparator); - } - return new PairWiseMerger(inSchema, left, right, comparator); + return new PairWiseMerger(inSchema, left, right, primitiveComparator); } else { return sources[startIdx]; } } - private static class MemTableScanner extends AbstractScanner { - final Iterable<Tuple> iterable; + private static class MemTableScanner<T extends Tuple> extends AbstractScanner { + final Iterable<T> iterable; final long sortAndStoredBytes; final int totalRecords; - Iterator<Tuple> iterator; + Iterator<T> iterator; // for input stats float scannerProgress; int numRecords; TableStats scannerTableStats; - public MemTableScanner(Iterable<Tuple> iterable, int length, long inBytes) { + public MemTableScanner(Iterable<T> iterable, int length, long inBytes) { this.iterable = iterable; this.totalRecords = length; this.sortAndStoredBytes = inBytes; @@ -601,30 +634,6 @@ public class ExternalSortExec extends SortExec { CLOSED } - private static class VectorComparePairWiseMerger extends PairWiseMerger { - - private ComparableVector comparable; - - public VectorComparePairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner, - BaseTupleComparator comparator) throws IOException { - super(schema, leftScanner, rightScanner, null); - comparable = new ComparableVector(2, comparator.getSortSpecs(), comparator.getSortKeyIds()); - } - - @Override - protected Tuple prepare(int index, Tuple tuple) { - if (tuple != null) { - comparable.set(index, tuple); - } - return tuple; - } - - @Override - protected int compare() { - return comparable.compare(0, 1); - } - } - /** * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order. */ @@ -638,8 +647,10 @@ public class ExternalSortExec extends SortExec { private Tuple leftTuple; private Tuple rightTuple; + private boolean leftEOF; + private boolean rightEOF; - private final Tuple outTuple; + private Tuple outTuple; private float mergerProgress; private TableStats mergerInputStats; @@ -652,7 +663,6 @@ public class ExternalSortExec extends SortExec { this.leftScan = leftScanner; this.rightScan = rightScanner; this.comparator = comparator; - this.outTuple = new VTuple(schema.size()); } private void setState(State state) { @@ -665,8 +675,6 @@ public class ExternalSortExec extends SortExec { leftScan.init(); rightScan.init(); - prepareTuplesForFirstComparison(); - mergerInputStats = new TableStats(); mergerProgress = 0.0f; @@ -676,31 +684,46 @@ public class ExternalSortExec extends SortExec { } } - private void prepareTuplesForFirstComparison() throws IOException { - leftTuple = prepare(0, leftScan.next()); - rightTuple = prepare(1, rightScan.next()); - } - - protected Tuple prepare(int index, Tuple tuple) { - return tuple; - } - protected int compare() { return comparator.compare(leftTuple, rightTuple); } @Override public Tuple next() throws IOException { - if (leftTuple == null && rightTuple == null) { - return null; + if(!leftEOF && leftTuple == null) { + leftTuple = leftScan.next(); } - if (rightTuple == null || (leftTuple != null && compare() < 0)) { - outTuple.put(leftTuple.getValues()); - leftTuple = prepare(0, leftScan.next()); + + if(!rightEOF && rightTuple == null) { + rightTuple = rightScan.next(); + } + + if (leftTuple != null && rightTuple != null) { + if (compare() < 0) { + outTuple = leftTuple; + leftTuple = null; + } else { + outTuple = rightTuple; + rightTuple = null; + } return outTuple; } - outTuple.put(rightTuple.getValues()); - rightTuple = prepare(1, rightScan.next()); + + if (leftTuple == null) { + leftEOF = true; + + if (rightTuple != null) { + outTuple = rightTuple; + rightTuple = null; + } else { + rightEOF = true; + outTuple = null; + } + } else { + rightEOF = true; + outTuple = leftTuple; + leftTuple = null; + } return outTuple; } @@ -712,8 +735,11 @@ public class ExternalSortExec extends SortExec { leftTuple = null; rightTuple = null; + outTuple = null; + + leftEOF = false; + rightEOF = false; - prepareTuplesForFirstComparison(); } else { throw new IllegalStateException("Illegal State: init() is not allowed in " + state.name()); } @@ -778,27 +804,25 @@ public class ExternalSortExec extends SortExec { if (result != null) { result.close(); - try { - inputStats = (TableStats)result.getInputStats().clone(); - } catch (CloneNotSupportedException e) { - LOG.warn(e.getMessage()); - } - result = null; } if (finalOutputFiles != null) { for (Chunk chunk : finalOutputFiles) { - FileFragment frag = chunk.getFragment(); - File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri()); - if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) { - localFS.delete(frag.getPath(), true); - LOG.info("Delete file: " + frag); + if (!chunk.isMemory()) { + FileFragment frag = chunk.getFragment(); + File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri()); + if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) { + localFS.delete(frag.getPath(), true); + if(LOG.isDebugEnabled()) { + debug(LOG, "Delete file: " + frag); + } + } } } } - if(inMemoryTable != null){ - inMemoryTable.clear(); + if(inMemoryTable != null) { + inMemoryTable.release(); inMemoryTable = null; } @@ -831,21 +855,35 @@ public class ExternalSortExec extends SortExec { @Override public TableStats getInputStats() { if (result != null) { - return result.getInputStats(); - } else { - return inputStats; + + TableStats tableStats = result.getInputStats(); + inputStats.setNumRows(tableStats.getNumRows()); + inputStats.setNumBytes(inputBytes); + inputStats.setReadBytes(tableStats.getReadBytes()); } + return inputStats; } private static class Chunk { private FileFragment fragment; private TableMeta meta; + private Schema schema; + private UnSafeTupleList memoryTuples; + private boolean isMemory; - public Chunk(FileFragment fragment, TableMeta meta) { + public Chunk(Schema schema, FileFragment fragment, TableMeta meta) { + this.schema = schema; this.fragment = fragment; this.meta = meta; } + public Chunk(Schema schema, UnSafeTupleList tuples, TableMeta meta) { + this.memoryTuples = tuples; + this.isMemory = true; + this.schema = schema; + this.meta = meta; + } + public FileFragment getFragment() { return fragment; } @@ -853,5 +891,169 @@ public class ExternalSortExec extends SortExec { public TableMeta getMeta() { return meta; } + + public UnSafeTupleList getMemoryTuples() { + return memoryTuples; + } + + public boolean isMemory() { + return isMemory; + } + + public Schema getSchema() { + return schema; + } + } + + /** + * The Comparator class for UnSafeTuples + * + * @see UnSafeTuple + */ + static class UnSafeComparator implements Comparator<UnSafeTuple> { + private final int[] sortKeyIds; + private final TajoDataTypes.Type[] sortKeyTypes; + private final boolean[] asc; + private final boolean[] nullFirsts; + + /** + * @param schema The schema of input tuples + * @param sortKeys The description of sort keys + */ + public UnSafeComparator(Schema schema, SortSpec[] sortKeys) { + Preconditions.checkArgument(sortKeys.length > 0, + "At least one sort key must be specified."); + + this.sortKeyIds = new int[sortKeys.length]; + this.sortKeyTypes = new TajoDataTypes.Type[sortKeys.length]; + this.asc = new boolean[sortKeys.length]; + this.nullFirsts = new boolean[sortKeys.length]; + for (int i = 0; i < sortKeys.length; i++) { + if (sortKeys[i].getSortKey().hasQualifier()) { + this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + } else { + this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); + } + + this.asc[i] = sortKeys[i].isAscending(); + this.nullFirsts[i] = sortKeys[i].isNullsFirst(); + this.sortKeyTypes[i] = sortKeys[i].getSortKey().getDataType().getType(); + } + } + + @Override + public int compare(UnSafeTuple tuple1, UnSafeTuple tuple2) { + for (int i = 0; i < sortKeyIds.length; i++) { + int compare = OffHeapRowBlockUtils.compareColumn(tuple1, tuple2, + sortKeyIds[i], sortKeyTypes[i], asc[i], nullFirsts[i]); + + if (compare != 0) { + return compare; + } + } + return 0; + } + } + + /** + * The Comparator class for raw file + */ + static class PrimitiveComparator implements Comparator<Tuple> { + private final int[] sortKeyIds; + private final TajoDataTypes.Type[] sortKeyTypes; + private final boolean[] asc; + private final boolean[] nullFirsts; + + /** + * @param schema The schema of input tuples + * @param sortKeys The description of sort keys + */ + public PrimitiveComparator(Schema schema, SortSpec[] sortKeys) { + Preconditions.checkArgument(sortKeys.length > 0, + "At least one sort key must be specified."); + + this.sortKeyIds = new int[sortKeys.length]; + this.sortKeyTypes = new TajoDataTypes.Type[sortKeys.length]; + this.asc = new boolean[sortKeys.length]; + this.nullFirsts = new boolean[sortKeys.length]; + for (int i = 0; i < sortKeys.length; i++) { + if (sortKeys[i].getSortKey().hasQualifier()) { + this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + } else { + this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); + } + + this.asc[i] = sortKeys[i].isAscending(); + this.nullFirsts[i] = sortKeys[i].isNullsFirst(); + this.sortKeyTypes[i] = sortKeys[i].getSortKey().getDataType().getType(); + } + } + + @Override + public int compare(Tuple tuple1, Tuple tuple2) { + for (int i = 0; i < sortKeyIds.length; i++) { + int compare = compareColumn(tuple1, tuple2, + sortKeyIds[i], sortKeyTypes[i], asc[i], nullFirsts[i]); + + if (compare != 0) { + return compare; + } + } + return 0; + } + + public int compareColumn(Tuple tuple1, Tuple tuple2, int index, TajoDataTypes.Type type, + boolean ascending, boolean nullFirst) { + final boolean n1 = tuple1.isBlankOrNull(index); + final boolean n2 = tuple2.isBlankOrNull(index); + if (n1 && n2) { + return 0; + } + + if (n1 ^ n2) { + return nullFirst ? (n1 ? -1 : 1) : (n1 ? 1 : -1); + } + + int compare; + switch (type) { + case BOOLEAN: + compare = Booleans.compare(tuple1.getBool(index), tuple2.getBool(index)); + break; + case BIT: + compare = tuple1.getByte(index) - tuple2.getByte(index); + break; + case INT1: + case INT2: + compare = Shorts.compare(tuple1.getInt2(index), tuple2.getInt2(index)); + break; + case DATE: + case INT4: + compare = Ints.compare(tuple1.getInt4(index), tuple2.getInt4(index)); + break; + case INET4: + compare = UnsignedInts.compare(tuple1.getInt4(index), tuple2.getInt4(index)); + break; + case TIME: + case TIMESTAMP: + case INT8: + compare = Longs.compare(tuple1.getInt8(index), tuple2.getInt8(index)); + break; + case FLOAT4: + compare = Floats.compare(tuple1.getFloat4(index), tuple2.getFloat4(index)); + break; + case FLOAT8: + compare = Doubles.compare(tuple1.getFloat8(index), tuple2.getFloat8(index)); + break; + case CHAR: + case TEXT: + case BLOB: + compare = TextDatum.COMPARATOR.compare(tuple1.getBytes(index), tuple2.getBytes(index)); + break; + default: + throw new TajoRuntimeException( + new UnsupportedException("unknown data type '" + type.name() + "'")); + } + return ascending ? compare : -compare; + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index 87a19a9..c70e1ff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -70,6 +70,10 @@ public abstract class PhysicalExec implements SchemaObject { log.info("["+ context.getTaskId() + "] " + message); } + protected void debug(Log log, String message) { + log.debug("["+ context.getTaskId() + "] " + message); + } + protected void warn(Log log, String message) { log.warn("[" + context.getTaskId() + "] " + message); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 474ca2f..06bba18 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -166,6 +166,11 @@ public class ExecutionBlockContext { return; } + LOG.info("Worker's task counter - total:" + completedTasksNum.intValue() + + ", succeeded: " + succeededTasksNum.intValue() + + ", killed: " + killedTasksNum.intValue() + + ", failed: " + failedTasksNum.intValue()); + try { reporter.stop(); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 2e1639f..b5abffe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -26,6 +26,7 @@ import io.netty.handler.codec.http.*; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.ReferenceCountUtil; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; @@ -152,7 +153,9 @@ public class Fetcher { request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); - LOG.info("Status: " + getState() + ", URI:" + uri); + if(LOG.isDebugEnabled()) { + LOG.info("Status: " + getState() + ", URI:" + uri); + } // Send the HTTP request. channel.writeAndFlush(request); @@ -168,7 +171,17 @@ public class Fetcher { } this.finishTime = System.currentTimeMillis(); - LOG.info("Fetcher finished:" + (finishTime - startTime) + " ms, " + getState() + ", URI:" + uri); + long elapsedMills = finishTime - startTime; + String transferSpeed; + if(elapsedMills > 1000) { + long bytePerSec = (fileChunk.length() * 1000) / elapsedMills; + transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec); + } else { + transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0)); + } + + LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s", + elapsedMills, transferSpeed, fileChunk.length(), getState(), uri)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index eff2c28..a622c16 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -110,7 +110,9 @@ public class TaskImpl implements Task { this.descs = Maps.newHashMap(); Path baseDirPath = executionBlockContext.createBaseDir(); - LOG.info("Task basedir is created (" + baseDirPath +")"); + + if(LOG.isDebugEnabled()) LOG.debug("Task basedir is created (" + baseDirPath +")"); + TaskAttemptId taskAttemptId = request.getId(); this.taskDir = StorageUtil.concatPath(baseDirPath, @@ -146,24 +148,21 @@ public class TaskImpl implements Task { } this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); - LOG.info("=================================="); - LOG.info("* Stage " + request.getId() + " is initialized"); - LOG.info("* InterQuery: " + interQuery - + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + - ", Fragments (num: " + request.getFragments().size() + ")" + - ", Fetches (total:" + request.getFetches().size() + ") :"); + + LOG.info(String.format("* Task %s is initialized. InterQuery: %b, Shuffle: %s, Fragments: %d, Fetches:%d, " + + "Local dir: %s", request.getId(), interQuery, shuffleType, request.getFragments().size(), + request.getFetches().size(), taskDir)); if(LOG.isDebugEnabled()) { for (FetchImpl f : request.getFetches()) { LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); } } - LOG.info("* Local task dir: " + taskDir); + if(LOG.isDebugEnabled()) { LOG.debug("* plan:\n"); LOG.debug(plan.toString()); } - LOG.info("=================================="); } private void updateDescsForScanNodes(NodeType nodeType) { @@ -191,7 +190,6 @@ public class TaskImpl implements Task { @Override public void init() throws IOException { - LOG.info("Initializing: " + getId()); initPlan(); startScriptExecutors(); @@ -210,8 +208,10 @@ public class TaskImpl implements Task { for (String inputTable : context.getInputTables()) { tableDir = new Path(inputTableBaseDir, inputTable); if (!localFS.exists(tableDir)) { - LOG.info("the directory is created " + tableDir.toUri()); localFS.mkdirs(tableDir); + if(LOG.isDebugEnabled()) { + LOG.debug("the directory is created " + tableDir.toUri()); + } } } } @@ -456,11 +456,8 @@ public class TaskImpl implements Task { queryMasterStub.done(null, report, NullCallback.get()); } endTime = System.currentTimeMillis(); - LOG.info(context.getTaskId() + " completed. " + - "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + - ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() - + ", killed: " + executionBlockContext.killedTasksNum.intValue() - + ", failed: " + executionBlockContext.failedTasksNum.intValue()); + LOG.info(String.format("%s is complete. %d ms elapsed, final state:%s", + context.getTaskId(), endTime - startTime, context.getState())); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 47a92c4..3aab2f0 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -392,10 +392,7 @@ public class TajoPullServerService extends AbstractService { public void decrementRemainFiles(FileRegion filePart, long fileStartTime) { long fileSendTime = System.currentTimeMillis() - fileStartTime; - if (fileSendTime > 20 * 1000) { - LOG.info("PullServer send too long time: filePos=" + filePart.position() + ", fileLen=" + filePart.count()); - SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile+ 1); - } + if (fileSendTime > maxTime) { maxTime = fileSendTime; } @@ -403,12 +400,20 @@ public class TajoPullServerService extends AbstractService { minTime = fileSendTime; } + if (fileSendTime > 20 * 1000) { + LOG.warn("Sending data takes too long. " + fileSendTime + "ms elapsed, " + + "length:" + (filePart.count() - filePart.position()) + ", URI:" + requestUri); + SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile + 1); + } + REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1); if (REMAIN_FILE_UPDATER.get(this) <= 0) { processingStatusMap.remove(requestUri); - LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " - + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " - + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile); + if(LOG.isDebugEnabled()) { + LOG.debug("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " + + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " + + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile); + } } } } @@ -431,7 +436,10 @@ public class TajoPullServerService extends AbstractService { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { accepted.add(ctx.channel()); - LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size())); + + if(LOG.isDebugEnabled()) { + LOG.debug(String.format("Current number of shuffle connections (%d)", accepted.size())); + } super.channelRegistered(ctx); } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java index c8f46a6..6d4b137 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java @@ -40,11 +40,7 @@ public class BaseTupleComparator extends TupleComparator implements ProtoObject< private final int[] sortKeyIds; private final boolean[] asc; @SuppressWarnings("unused") - private final boolean[] nullFirsts; - - private Datum left; - private Datum right; - private int compVal; + private final boolean[] nullFirsts; /** * @param schema The schema of input tuples @@ -110,29 +106,30 @@ public class BaseTupleComparator extends TupleComparator implements ProtoObject< @Override public int compare(Tuple tuple1, Tuple tuple2) { + for (int i = 0; i < sortKeyIds.length; i++) { - left = tuple1.asDatum(sortKeyIds[i]); - right = tuple2.asDatum(sortKeyIds[i]); - - if (left.isNull() || right.isNull()) { - if (!left.equals(right)) { - if (left.isNull()) { - compVal = nullFirsts[i] ? -1 : 1; - } else { - compVal = nullFirsts[i] ? 1 : -1; - } - } else { - compVal = 0; - } + final boolean n1 = tuple1.isBlankOrNull(sortKeyIds[i]); + final boolean n2 = tuple2.isBlankOrNull(sortKeyIds[i]); + + if (n1 && n2) { + continue; + } + + if (n1 ^ n2) { + return nullFirsts[i] ? (n1 ? -1 : 1) : (n1 ? 1 : -1); + } + + Datum left = tuple1.asDatum(sortKeyIds[i]); + Datum right = tuple2.asDatum(sortKeyIds[i]); + + int compVal; + if (asc[i]) { + compVal = left.compareTo(right); } else { - if (asc[i]) { - compVal = left.compareTo(right); - } else { - compVal = right.compareTo(left); - } + compVal = right.compareTo(left); } - if (compVal < 0 || compVal > 0) { + if (compVal != 0) { return compVal; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/a28db5a7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java index a20adf7..7c91197 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java @@ -59,7 +59,6 @@ public class NullScanner implements Scanner { @Override public Tuple next() throws IOException { - progress = 1.0f; return null; }
