Repository: drill Updated Branches: refs/heads/master cee67de2e -> ee9e613d7
DRILL-6080: Sort incorrectly limits batch size to 65535 records closes #1090 * Sort incorrectly limits batch size to 65535 records rather than 65536. * This PR also includes a few code cleanup items. * Fix for overflow in offset vector in row set writer * Performance tool update * Replace "unsafe" methods with "set" methods * Also fixes an indexing issue with nullable writers * Removed debug & timing code * Increase strictness for batch size Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f0d00c62 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f0d00c62 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f0d00c62 Branch: refs/heads/master Commit: f0d00c62b594e424ea085ebd0a5be26f0f509fda Parents: 039530a Author: Paul Rogers <[email protected]> Authored: Wed Jan 10 16:04:53 2018 -0800 Committer: Boaz Ben-Zvi <[email protected]> Committed: Tue Jan 30 19:54:57 2018 -0800 ---------------------------------------------------------------------- .../impl/sort/SortRecordBatchBuilder.java | 6 +- .../exec/physical/impl/xsort/MSortTemplate.java | 9 +- .../exec/record/selection/SelectionVector4.java | 4 +- .../impl/xsort/managed/TestSortImpl.java | 72 ++++++++-------- .../drill/test/rowSet/test/PerformanceTool.java | 10 +++ .../rowSet/test/TestVariableWidthWriter.java | 2 +- .../src/main/java/io/netty/buffer/DrillBuf.java | 86 -------------------- .../main/codegen/templates/ColumnAccessors.java | 24 +++--- .../writer/AbstractFixedWidthWriter.java | 2 +- .../accessor/writer/NullableScalarWriter.java | 51 +++++++++++- .../accessor/writer/OffsetVectorWriter.java | 36 +++++--- 11 files changed, 145 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 6c66c01..d995902 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -105,7 +105,7 @@ public class SortRecordBatchBuilder implements AutoCloseable { return; } - if(runningBatches >= Character.MAX_VALUE) { + if (runningBatches >= Character.MAX_VALUE) { final String errMsg = String.format("Tried to add more than %d number of batches.", (int) Character.MAX_VALUE); logger.error(errMsg); throw new DrillRuntimeException(errMsg); @@ -152,7 +152,7 @@ public class SortRecordBatchBuilder implements AutoCloseable { if (svBuffer == null) { throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder."); } - sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE); + sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT); BatchSchema schema = batches.keySet().iterator().next(); List<RecordBatchData> data = batches.get(schema); @@ -174,7 +174,7 @@ public class SortRecordBatchBuilder implements AutoCloseable { int recordBatchId = 0; for (RecordBatchData d : data) { for (int i = 0; i < d.getRecordCount(); i++, index++) { - sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i)); + sv4.set(index, recordBatchId, d.getSv2().getIndex(i)); } // might as well drop the selection vector since we'll stop using it now. d.getSv2().clear(); http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 9b69170..afbc58b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -32,10 +32,10 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.util.IndexedSortable; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.collect.Queues; public abstract class MSortTemplate implements MSorter, IndexedSortable { @@ -43,7 +43,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { private SelectionVector4 vector4; private SelectionVector4 aux; - private long compares; private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue(); private FragmentContext context; @@ -74,13 +73,14 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { throw new UnsupportedOperationException(String.format("Missing batch. batch: %d newBatch: %d", batch, newBatch)); } } + @SuppressWarnings("resource") final DrillBuf drillBuf = allocator.buffer(4 * totalCount); try { desiredRecordBatchCount = context.getConfig().getInt(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE); } catch(ConfigException.Missing e) { // value not found, use default value instead - desiredRecordBatchCount = Character.MAX_VALUE; + desiredRecordBatchCount = ValueVector.MAX_ROW_COUNT; } aux = new SelectionVector4(drillBuf, totalCount, desiredRecordBatchCount); } @@ -126,7 +126,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { @Override public void sort(final VectorContainer container) { - final Stopwatch watch = Stopwatch.createStarted(); while (runStarts.size() > 1) { // check if we're cancelled/failed frequently @@ -153,6 +152,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { if (outIndex < vector4.getTotalCount()) { copyRun(outIndex, vector4.getTotalCount()); } + @SuppressWarnings("resource") final SelectionVector4 tmp = aux.createNewWrapperCurrent(desiredRecordBatchCount); aux.clear(); aux = vector4.createNewWrapperCurrent(desiredRecordBatchCount); @@ -181,7 +181,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { public int compare(final int leftIndex, final int rightIndex) { final int sv1 = vector4.get(leftIndex); final int sv2 = vector4.get(rightIndex); - compares++; try { return doEval(sv1, sv2); } catch (SchemaChangeException e) { http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java index b51fdca..2c10d6d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java @@ -112,8 +112,8 @@ public class SelectionVector4 implements AutoCloseable { return false; } - start = start+length; - int newEnd = Math.min(start+length, recordCount); + start = start + length; + int newEnd = Math.min(start + length, recordCount); length = newEnd - start; // logger.debug("New start {}, new length {}", start, length); return true; http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java index cdca30e..a985478 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java @@ -23,13 +23,13 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.drill.categories.OperatorTest; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.spill.SpillSet; @@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.test.DrillTest; import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.DirectRowSet; @@ -45,16 +46,18 @@ import org.apache.drill.test.rowSet.HyperRowSetImpl; import org.apache.drill.test.rowSet.IndirectRowSet; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; -import org.apache.drill.test.rowSet.RowSetReader; -import org.apache.drill.test.rowSet.RowSetWriter; import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.RowSetWriter; import org.apache.drill.test.rowSet.SchemaBuilder; import org.junit.Test; +import org.junit.experimental.categories.Category; -import com.google.common.base.Stopwatch; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.junit.experimental.categories.Category; + +import io.netty.buffer.DrillBuf; /** * Tests the external sort implementation: the "guts" of the sort stripped of the @@ -157,12 +160,6 @@ public class TestSortImpl extends DrillTest { for (RowSet expectedSet : expected) { assertTrue(results.next()); RowSet rowSet = toRowSet(results, dest); - // Uncomment these for debugging. Leave them commented otherwise - // to avoid polluting the Maven build output unnecessarily. -// System.out.println("Expected:"); -// expectedSet.print(); -// System.out.println("Actual:"); -// rowSet.print(); new RowSetComparison(expectedSet) .verify(rowSet); expectedSet.clear(); @@ -325,7 +322,8 @@ public class TestSortImpl extends DrillTest { public DataGenerator(OperatorFixture fixture, int targetCount, int batchSize, int seed, int step) { this.fixture = fixture; this.targetCount = targetCount; - this.batchSize = Math.min(batchSize, Character.MAX_VALUE); + Preconditions.checkArgument(batchSize > 0 && batchSize <= ValueVector.MAX_ROW_COUNT); + this.batchSize = batchSize; this.step = step; schema = SortTestUtilities.nonNullSchema(); currentValue = seed; @@ -380,7 +378,8 @@ public class TestSortImpl extends DrillTest { public DataValidator(int targetCount, int batchSize) { this.targetCount = targetCount; - this.batchSize = Math.min(batchSize, Character.MAX_VALUE); + Preconditions.checkArgument(batchSize > 0 && batchSize <= ValueVector.MAX_ROW_COUNT); + this.batchSize = batchSize; } public void validate(RowSet output) { @@ -400,8 +399,6 @@ public class TestSortImpl extends DrillTest { } } - Stopwatch timer = Stopwatch.createUnstarted(); - /** * Run a full-blown sort test with multiple input batches. Because we want to * generate multiple inputs, we don't create them statically. Instead, we generate @@ -428,33 +425,25 @@ public class TestSortImpl extends DrillTest { if (batchCount == 1) { // Simulates a NEW_SCHEMA event - timer.start(); sort.setSchema(input.container().getSchema()); - timer.stop(); } // Simulates an OK event - timer.start(); sort.addBatch(input.vectorAccessible()); - timer.stop(); } // Simulate returning results - timer.start(); SortResults results = sort.startMerge(); if (results.getContainer() != dest) { dest.clear(); dest = results.getContainer(); } while (results.next()) { - timer.stop(); RowSet output = toRowSet(results, dest); validator.validate(output); - timer.start(); } - timer.stop(); validator.validateDone(); results.close(); dest.clear(); @@ -469,11 +458,9 @@ public class TestSortImpl extends DrillTest { */ public void runJumboBatchTest(OperatorFixture fixture, int rowCount) { - timer.reset(); - DataGenerator dataGen = new DataGenerator(fixture, rowCount, Character.MAX_VALUE); - DataValidator validator = new DataValidator(rowCount, Character.MAX_VALUE); + DataGenerator dataGen = new DataGenerator(fixture, rowCount, ValueVector.MAX_ROW_COUNT); + DataValidator validator = new DataValidator(rowCount, ValueVector.MAX_ROW_COUNT); runLargeSortTest(fixture, dataGen, validator); - System.out.println(timer.elapsed(TimeUnit.MILLISECONDS)); } /** @@ -499,7 +486,30 @@ public class TestSortImpl extends DrillTest { @Test public void testLargeBatch() throws Exception { try (OperatorFixture fixture = OperatorFixture.standardFixture()) { - runJumboBatchTest(fixture, Character.MAX_VALUE); +// partyOnMemory(fixture.allocator()); + runJumboBatchTest(fixture, ValueVector.MAX_ROW_COUNT); + } + } + + /** + * Use this function to pre-load Netty's free list with a large + * number of "dirty" blocks. This will often catch error due to + * failure to initialize value vector memory. + * + * @param fixture the operator fixture that provides an allocator + */ + + @SuppressWarnings("unused") + private void partyOnMemory(BufferAllocator allocator) { + DrillBuf bufs[] = new DrillBuf[10]; + for (int i = 0; i < bufs.length; i++) { + bufs[i] = allocator.buffer(ValueVector.MAX_BUFFER_SIZE); + for (int j = 0; j < ValueVector.MAX_BUFFER_SIZE; j += 4) { + bufs[i].setInt(j, 0xDEADBEEF); + } + } + for (int i = 0; i < bufs.length; i++) { + bufs[i].release(); } } @@ -533,8 +543,6 @@ public class TestSortImpl extends DrillTest { VectorContainer dest = new VectorContainer(); SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest); - timer.reset(); - timer.start(); sort.setSchema(rowSet.container().getSchema()); sort.addBatch(rowSet.vectorAccessible()); SortResults results = sort.startMerge(); @@ -543,13 +551,11 @@ public class TestSortImpl extends DrillTest { dest = results.getContainer(); } assertTrue(results.next()); - timer.stop(); assertFalse(results.next()); results.close(); dest.clear(); sort.close(); sort.opContext().close(); - System.out.println(timer.elapsed(TimeUnit.MILLISECONDS)); } /** @@ -561,7 +567,7 @@ public class TestSortImpl extends DrillTest { @Test public void testWideRows() throws Exception { try (OperatorFixture fixture = OperatorFixture.standardFixture()) { - runWideRowsTest(fixture, 1000, Character.MAX_VALUE); + runWideRowsTest(fixture, 1000, ValueVector.MAX_ROW_COUNT); } } http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java index e84f2d3..ca282a1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java @@ -47,6 +47,16 @@ import com.google.common.base.Stopwatch; * <p> * Current results: * <ul> + * <li>Required and nullable writers are slightly faster than the + * corresponding vector mutator methods.</li> + * <li>Writer is 230% faster than a repeated mutator.</li> + * </ul> + * + * The key reason for the converged performance (now compared to earlier + * results below) is that both paths now use bounds-checking optimizations. + * <p> + * Prior results before the common bounds-check optimizations: + * <ul> * <li>Writer is 42% faster than a required mutator.</li> * <li>Writer is 73% faster than a nullable mutator.</li> * <li>Writer is 407% faster than a repeated mutator.</li> http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java index 103b212..912f362 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java @@ -373,7 +373,7 @@ public class TestVariableWidthWriter extends SubOperatorTest { @Override public boolean canExpand(ScalarWriter writer, int delta) { - System.out.println("Delta: " + delta); +// System.out.println("Delta: " + delta); totalAlloc += delta; return totalAlloc < 1024 * 1024; } http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java index 115d31e..109500a 100644 --- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java +++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java @@ -712,25 +712,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { return this; } - // Clone of UDLE's setBytes(), but with bounds checking done as a boolean, - // not assertion. - - public boolean setBytesBounded(int index, byte[] src, int srcIndex, int length) { - // Must do here because Drill's UDLE is not ref counted. - // Done as an assert to avoid production overhead: if this is going - // to fail, it will do so spectacularly in tests, due to a programming error. - assert refCnt() > 0; - return udle.setBytesBounded(index, src, srcIndex, length); - } - - // As above, but for direct memory. - - public boolean setBytesBounded(int index, DrillBuf src, int srcIndex, int length) { - // See above. - assert refCnt() > 0; - return udle.setBytesBounded(index, src.udle, srcIndex, length); - } - @Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { udle.setBytes(index + offset, src, srcIndex, length); @@ -842,71 +823,4 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces); } } - - // The "unsafe" methods are for use ONLY by code that does its own - // bounds checking. They are called "unsafe" for a reason: they will crash - // the JVM if values are addressed out of bounds. - - /** - * Write an integer to the buffer at the given byte index, without - * bounds checks. - * - * @param offset byte (not int) offset of the location to write - * @param value the value to write - */ - - public void unsafePutInt(int offset, int value) { - PlatformDependent.putInt(addr + offset, value); - } - - /** - * Write a long to the buffer at the given byte index, without - * bounds checks. - * - * @param index byte (not long) offset of the location to write - * @param value the value to write - */ - - public void unsafePutLong(int index, long value) { - PlatformDependent.putLong(addr + index, value); - } - - /** - * Write a short to the buffer at the given byte index, without - * bounds checks. - * - * @param offset byte (not short) offset of the location to write - * @param value the value to write - */ - - public void unsafePutShort(int offset, short value) { - PlatformDependent.putShort(addr + offset, value); - } - - /** - * Write a byte to the buffer at the given byte index, without - * bounds checks. - * - * @param offset byte offset of the location to write - * @param value the value to write - */ - - public void unsafePutByte(int offset, byte value) { - PlatformDependent.putByte(addr + offset, value); - } - - /** - * Copy a buffer of heap data to the buffer memory. - * - * @param srce source byte buffer - * @param srcOffset offset within the byte buffer of the start of data - * @param destOffset byte offset into this buffer to which to write the - * data - * @param length length of the data, which must be within the - * bounds of this buffer - */ - - public void unsafeCopyMemory(byte[] srce, int srcOffset, int destOffset, int length) { - PlatformDependent.copyMemory(srce, srcOffset, addr + destOffset, length); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/codegen/templates/ColumnAccessors.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java index 33b12be..14ec1e8 100644 --- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java +++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java @@ -276,14 +276,14 @@ public class ColumnAccessors { <#assign putAddr = "writeIndex * VALUE_WIDTH"> </#if> <#if varWidth> - drillBuf.unsafeCopyMemory(value, 0, offset, len); + drillBuf.setBytes(offset, value, 0, len); offsetsWriter.setNextOffset(offset + len); <#elseif drillType == "Decimal9"> - drillBuf.unsafePutInt(${putAddr}, + drillBuf.setInt(${putAddr}, DecimalUtility.getDecimal9FromBigDecimal(value, type.getScale(), type.getPrecision())); <#elseif drillType == "Decimal18"> - drillBuf.unsafePutLong(${putAddr}, + drillBuf.setLong(${putAddr}, DecimalUtility.getDecimal18FromBigDecimal(value, type.getScale(), type.getPrecision())); <#elseif drillType == "Decimal38Sparse"> @@ -295,23 +295,23 @@ public class ColumnAccessors { DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH, type.getScale(), type.getPrecision(), 5); <#elseif drillType == "IntervalYear"> - drillBuf.unsafePutInt(${putAddr}, + drillBuf.setInt(${putAddr}, value.getYears() * 12 + value.getMonths()); <#elseif drillType == "IntervalDay"> final int offset = ${putAddr}; - drillBuf.unsafePutInt(offset, value.getDays()); - drillBuf.unsafePutInt(offset + 4, periodToMillis(value)); + drillBuf.setInt(offset, value.getDays()); + drillBuf.setInt(offset + 4, periodToMillis(value)); <#elseif drillType == "Interval"> final int offset = ${putAddr}; - drillBuf.unsafePutInt(offset, value.getYears() * 12 + value.getMonths()); - drillBuf.unsafePutInt(offset + 4, value.getDays()); - drillBuf.unsafePutInt(offset + 8, periodToMillis(value)); + drillBuf.setInt(offset, value.getYears() * 12 + value.getMonths()); + drillBuf.setInt(offset + 4, value.getDays()); + drillBuf.setInt(offset + 8, periodToMillis(value)); <#elseif drillType == "Float4"> - drillBuf.unsafePutInt(${putAddr}, Float.floatToRawIntBits((float) value)); + drillBuf.setInt(${putAddr}, Float.floatToRawIntBits((float) value)); <#elseif drillType == "Float8"> - drillBuf.unsafePutLong(${putAddr}, Double.doubleToRawLongBits(value)); + drillBuf.setLong(${putAddr}, Double.doubleToRawLongBits(value)); <#else> - drillBuf.unsafePut${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value); + drillBuf.set${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value); </#if> vectorIndex.nextElement(); } http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java index e49f92c..1107216 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java @@ -98,7 +98,7 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter { while (dest < writeIndex) { int length = writeIndex - dest; length = Math.min(length, stride); - drillBuf.unsafeCopyMemory(ZERO_BUF, 0, dest * width, length * width); + drillBuf.setBytes(dest * width, ZERO_BUF, 0, length * width); dest += length; } } http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java index 6da2b50..2068304 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java @@ -30,8 +30,45 @@ import org.joda.time.Period; public class NullableScalarWriter extends AbstractScalarWriter { + public static final class ChildIndex implements ColumnWriterIndex { + + private final ColumnWriterIndex parentIndex; + + public ChildIndex(ColumnWriterIndex parentIndex) { + this.parentIndex = parentIndex; + } + + @Override + public int rowStartIndex() { + return parentIndex.rowStartIndex(); + } + + @Override + public int vectorIndex() { + return parentIndex.vectorIndex(); + } + + @Override + public void nextElement() { + // Ignore next element requests from children. + // Nullable writers have two children, we don't want + // to increment the index twice. + } + + @Override + public void rollover() { + parentIndex.rollover(); + } + + @Override + public ColumnWriterIndex outerIndex() { + return parentIndex.outerIndex(); + } + } + private final UInt1ColumnWriter isSetWriter; private final BaseScalarWriter baseWriter; + private ColumnWriterIndex writerIndex; public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter baseWriter) { isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector()); @@ -54,8 +91,10 @@ public class NullableScalarWriter extends AbstractScalarWriter { @Override public void bindIndex(ColumnWriterIndex index) { - isSetWriter.bindIndex(index); - baseWriter.bindIndex(index); + writerIndex = index; + ColumnWriterIndex childIndex = new ChildIndex(index); + isSetWriter.bindIndex(childIndex); + baseWriter.bindIndex(childIndex); } @Override @@ -76,24 +115,28 @@ public class NullableScalarWriter extends AbstractScalarWriter { public void setNull() { isSetWriter.setInt(0); baseWriter.skipNulls(); + writerIndex.nextElement(); } @Override public void setInt(int value) { baseWriter.setInt(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setLong(long value) { baseWriter.setLong(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setDouble(double value) { baseWriter.setDouble(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override @@ -105,24 +148,28 @@ public class NullableScalarWriter extends AbstractScalarWriter { baseWriter.setString(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setBytes(byte[] value, int len) { baseWriter.setBytes(value, len); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setDecimal(BigDecimal value) { baseWriter.setDecimal(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override public void setPeriod(Period value) { baseWriter.setPeriod(value); isSetWriter.setInt(1); + writerIndex.nextElement(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java index d5f9b30..49d16a3 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java @@ -171,7 +171,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) { realloc(MIN_BUFFER_SIZE); } - vector.getBuffer().unsafePutInt(0, 0); + vector.getBuffer().setInt(0, 0); } public int nextOffset() { return nextOffset; } @@ -199,7 +199,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { final int valueIndex = vectorIndex.vectorIndex(); int writeIndex = valueIndex + 1; - if (lastWriteIndex < valueIndex - 1 || writeIndex >= capacity) { + if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) { writeIndex = prepareWrite(writeIndex); } @@ -207,7 +207,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { // Recall, it is the value index, which is one less than the (end) // offset index. - lastWriteIndex = writeIndex - 1; + lastWriteIndex = valueIndex; return writeIndex; } @@ -220,24 +220,27 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { // Call to resize may cause rollover, so reset write index // afterwards. - writeIndex = vectorIndex.vectorIndex() + 1; + final int valueIndex = vectorIndex.vectorIndex(); // Fill empties to the write position. + // Fill empties works off the row index, not the write + // index. The write index is one past the row index. + // (Yes, this is complex...) - fillEmpties(writeIndex); - return writeIndex; + fillEmpties(valueIndex); + return valueIndex + 1; } @Override - protected final void fillEmpties(final int writeIndex) { - while (lastWriteIndex < writeIndex - 1) { - drillBuf.unsafePutInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset); + protected final void fillEmpties(final int valueIndex) { + while (lastWriteIndex < valueIndex - 1) { + drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset); } } public final void setNextOffset(final int newOffset) { final int writeIndex = writeIndex(); - drillBuf.unsafePutInt(writeIndex * VALUE_WIDTH, newOffset); + drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset); nextOffset = newOffset; } @@ -267,8 +270,17 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter { } @Override - public final void endWrite() { - setValueCount(vectorIndex.vectorIndex() + 1); + public void setValueCount(int valueCount) { + + // Slightly different version of the fixed-width writer + // code. Offset counts are one greater than the value count. + // But for all other purposes, we track the value (row) + // position, not the offset position. + + mandatoryResize(valueCount); + fillEmpties(valueCount); + vector().getBuffer().writerIndex((valueCount + 1) * width()); + lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1); } @Override
