Repository: asterixdb Updated Branches: refs/heads/master c5a0a1974 -> ed4693812
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java index c473819..980857a 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java @@ -55,18 +55,19 @@ public class TupleSorterHeapSort implements ITupleSorter { } class HeapEntry implements IResetableComparable<HeapEntry> { - int nmk; + int[] nmk; TuplePointer tuplePointer; public HeapEntry() { tuplePointer = new TuplePointer(); - nmk = 0; + nmk = new int[normalizedKeyTotalLength]; } @Override public int compareTo(HeapEntry o) { - if (nmk != o.nmk) { - return ((((long) nmk) & 0xffffffffL) < (((long) o.nmk) & 0xffffffffL)) ? -1 : 1; + int cmpNormalizedKey = AbstractFrameSorter.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength); + if (cmpNormalizedKey != 0 || normalizedKeyDecisive) { + return cmpNormalizedKey; } bufferAccessor1.reset(tuplePointer); bufferAccessor2.reset(o.tuplePointer); @@ -93,13 +94,15 @@ public class TupleSorterHeapSort implements ITupleSorter { return 0; } - public void reset(int nmkey) { - nmk = nmkey; + public void reset(int[] nmkey) { + if (normalizedKeyTotalLength > 0) { + System.arraycopy(nmkey, 0, nmk, 0, normalizedKeyTotalLength); + } } @Override public void reset(HeapEntry other) { - nmk = other.nmk; + reset(other.nmk); tuplePointer.reset(other.tuplePointer); } } @@ -111,19 +114,23 @@ public class TupleSorterHeapSort implements ITupleSorter { private final FrameTupleAppender outputAppender; private final IFrame outputFrame; private final int[] sortFields; - private final INormalizedKeyComputer nkc; + private final INormalizedKeyComputer[] nkcs; + private final boolean normalizedKeyDecisive; + private final int[] normalizedKeyLength; + private final int normalizedKeyTotalLength; private final IBinaryComparator[] comparators; - private HeapEntry maxEntry; - private HeapEntry newEntry; + private final HeapEntry maxEntry; + private final HeapEntry newEntry; private MaxHeap heap; private boolean isSorted; + private final int[] nmk; + public TupleSorterHeapSort(IHyracksTaskContext ctx, IDeletableTupleBufferManager bufferManager, int topK, - int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories) - throws HyracksDataException { + int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories) throws HyracksDataException { this.bufferManager = bufferManager; this.bufferAccessor1 = bufferManager.createTuplePointerAccessor(); this.bufferAccessor2 = bufferManager.createTuplePointerAccessor(); @@ -131,7 +138,31 @@ public class TupleSorterHeapSort implements ITupleSorter { this.outputFrame = new VSizeFrame(ctx); this.outputAppender = new FrameTupleAppender(); this.sortFields = sortFields; - this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer(); + + int runningNormalizedKeyTotalLength = 0; + if (keyNormalizerFactories != null) { + int decisivePrefixLength = AbstractFrameSorter.getDecisivePrefixLength(keyNormalizerFactories); + + // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys + // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid + // computing unncessary normalized keys + int normalizedKeys = decisivePrefixLength < keyNormalizerFactories.length ? decisivePrefixLength + 1 + : decisivePrefixLength; + this.nkcs = new INormalizedKeyComputer[normalizedKeys]; + this.normalizedKeyLength = new int[normalizedKeys]; + + for (int i = 0; i < normalizedKeys; i++) { + this.nkcs[i] = keyNormalizerFactories[i].createNormalizedKeyComputer(); + this.normalizedKeyLength[i] = keyNormalizerFactories[i].getNormalizedKeyLength(); + runningNormalizedKeyTotalLength += this.normalizedKeyLength[i]; + } + this.normalizedKeyDecisive = decisivePrefixLength == comparatorFactories.length; + } else { + this.nkcs = null; + this.normalizedKeyLength = null; + this.normalizedKeyDecisive = false; + } + this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength; this.comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); @@ -141,6 +172,7 @@ public class TupleSorterHeapSort implements ITupleSorter { this.maxEntry = new HeapEntry(); this.newEntry = new HeapEntry(); this.isSorted = false; + this.nmk = new int[runningNormalizedKeyTotalLength]; } @Override @@ -154,7 +186,7 @@ public class TupleSorterHeapSort implements ITupleSorter { throw new HyracksDataException( "The Heap haven't be reset after sorting, the order of using this class is not correct."); } - int nmkey = getPNK(frameTupleAccessor, index); + int[] nmkey = getPNK(frameTupleAccessor, index); if (heap.getNumEntries() >= topK) { heap.peekMax(maxEntry); if (compareTuple(frameTupleAccessor, index, nmkey, maxEntry) >= 0) { @@ -175,20 +207,29 @@ public class TupleSorterHeapSort implements ITupleSorter { return true; } - private int getPNK(IFrameTupleAccessor fta, int tIx) { - if (nkc == null) { - return 0; + private int[] getPNK(IFrameTupleAccessor fta, int tIx) { + if (nkcs == null) { + return nmk; } - int sfIdx = sortFields[0]; - return nkc.normalize(fta.getBuffer().array(), fta.getAbsoluteFieldStartOffset(tIx, sfIdx), - fta.getFieldLength(tIx, sfIdx)); + int keyPos = 0; + byte[] buffer = fta.getBuffer().array(); + for (int i = 0; i < nkcs.length; i++) { + int sfIdx = sortFields[i]; + nkcs[i].normalize(buffer, fta.getAbsoluteFieldStartOffset(tIx, sfIdx), fta.getFieldLength(tIx, sfIdx), nmk, + keyPos); + keyPos += normalizedKeyLength[i]; + } + return nmk; } - private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int nmkey, HeapEntry maxEntry) + private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int[] nmkey, HeapEntry maxEntry) throws HyracksDataException { - if (nmkey != maxEntry.nmk) { - return ((((long) nmkey) & 0xffffffffL) < (((long) maxEntry.nmk) & 0xffffffffL)) ? -1 : 1; + int cmpNormalizedKey = + AbstractFrameSorter.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength); + if (cmpNormalizedKey != 0 || normalizedKeyDecisive) { + return cmpNormalizedKey; } + bufferAccessor2.reset(maxEntry.tuplePointer); byte[] b1 = frameTupleAccessor.getBuffer().array(); byte[] b2 = bufferAccessor2.getBuffer().array(); @@ -254,9 +295,8 @@ public class TupleSorterHeapSort implements ITupleSorter { for (int i = 0; i < numEntries; i++) { HeapEntry minEntry = (HeapEntry) entries[i]; bufferAccessor1.reset(minEntry.tuplePointer); - int flushed = FrameUtils - .appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(), - bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength()); + int flushed = FrameUtils.appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(), + bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength()); if (flushed > 0) { maxFrameSize = Math.max(maxFrameSize, flushed); io++; @@ -265,8 +305,7 @@ public class TupleSorterHeapSort implements ITupleSorter { maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize()); outputAppender.write(writer, true); if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info( - "Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames"); + LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames"); } return maxFrameSize; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java index 160336a..56bf853 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.dataset.ResultSetId; @@ -58,31 +59,33 @@ public class HeapSortMergeTest extends AbstractIntegrationTest { JobSpecification spec = new JobSpecification(); FileSplit[] ordersSplits = new FileSplit[] { - new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator - + "orders-part1.tbl"), - new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator - + "orders-part2.tbl") }; + new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"), + new ManagedFileSplit(NC2_ID, + "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); int outputLimit = 5; // larger than the total record numbers. - TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, - outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc); + TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 }, + (INormalizedKeyComputerFactory) null, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID); @@ -90,23 +93,21 @@ public class HeapSortMergeTest extends AbstractIntegrationTest { spec.addResultSetId(rsId); FileSplit fs = createFile(nc1); - IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider( - new FileSplit[] { fs }); + IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(new FileSplit[] { fs }); IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|"); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0); - spec.connect( - new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] { - 1, 0 }, new IBinaryHashFunctionFactory[] { - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 }, - new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0); + spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory( + new int[] { 1, 0 }, + new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), + new int[] { 1, 0 }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0); runTest(spec); System.out.println("Result write into :" + fs.getPath() + " in node: " + fs.getNodeName()); @@ -122,31 +123,33 @@ public class HeapSortMergeTest extends AbstractIntegrationTest { JobSpecification spec = new JobSpecification(); FileSplit[] ordersSplits = new FileSplit[] { - new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator - + "orders-part1.tbl"), - new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator - + "orders-part2.tbl") }; + new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"), + new ManagedFileSplit(NC2_ID, + "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer() }); + RecordDescriptor ordersDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider, new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc); + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); int outputLimit = 20; - TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, - outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc); + TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 }, + (INormalizedKeyComputerFactory) null, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID); LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java index cfd4f30..d3b6b5c 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.tests.unit; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.DataInputStream; import java.util.ArrayList; @@ -57,11 +57,11 @@ public abstract class AbstractRunGeneratorTest { static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers); - static Random GRandom = new Random(System.currentTimeMillis()); + static Random GRandom = new Random(0); static int[] SortFields = new int[] { 0, 1 }; - static IBinaryComparatorFactory[] ComparatorFactories = new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY), - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; + static IBinaryComparatorFactory[] ComparatorFactories = + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY), + PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; static void assertMaxFrameSizesAreAllEqualsTo(List<GeneratedRunFileReader> maxSize, int pageSize) { for (int i = 0; i < maxSize.size(); i++) { @@ -69,25 +69,30 @@ public abstract class AbstractRunGeneratorTest { } } - abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord) - throws HyracksDataException; + abstract AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, + int numOfInputRecord) throws HyracksDataException; - protected List<GeneratedRunFileReader> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize, - int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException { + protected List<List<GeneratedRunFileReader>> testSortRecords(int pageSize, int frameLimit, int numRuns, + int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException { IHyracksTaskContext ctx = testUtils.create(pageSize); HashMap<Integer, String> keyValuePair = new HashMap<>(); List<IFrame> frameList = new ArrayList<>(); prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize, specialData, keyValuePair); - AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size()); - runGenerator.open(); - for (IFrame frame : frameList) { - runGenerator.nextFrame(frame.getBuffer()); + + List<List<GeneratedRunFileReader>> results = new ArrayList<>(); + AbstractSortRunGenerator[] runGenerators = getSortRunGenerator(ctx, frameLimit, keyValuePair.size()); + for (AbstractSortRunGenerator runGenerator : runGenerators) { + runGenerator.open(); + for (IFrame frame : frameList) { + runGenerator.nextFrame(frame.getBuffer()); + } + runGenerator.close(); + matchResult(ctx, runGenerator.getRuns(), keyValuePair); + results.add(runGenerator.getRuns()); } - runGenerator.close(); - matchResult(ctx, runGenerator.getRuns(), keyValuePair); - return runGenerator.getRuns(); + return results; } static void matchResult(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, @@ -114,7 +119,9 @@ public abstract class AbstractRunGeneratorTest { bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength()); String value = (String) RecordDesc.getFields()[1].deserialize(di); - + if (!keyValuePair.containsKey(key)) { + assertTrue(false); + } if (!keyValuePair.get(key).equals(value)) { assertTrue(false); } @@ -146,7 +153,7 @@ public abstract class AbstractRunGeneratorTest { static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize, int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair) - throws HyracksDataException { + throws HyracksDataException { ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount()); FrameTupleAppender appender = new FrameTupleAppender(); @@ -158,8 +165,9 @@ public abstract class AbstractRunGeneratorTest { tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey()); tb.addField(new UTF8StringSerializerDeserializer(), entry.getValue()); - VSizeFrame frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore( - tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize())); + VSizeFrame frame = + new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, + tb.getSize(), ctx.getInitialFrameSize())); appender.reset(frame, true); assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())); frameList.add(frame); @@ -226,9 +234,25 @@ public abstract class AbstractRunGeneratorTest { int numRuns = 2; int minRecordSize = pageSize / 8; int maxRecordSize = pageSize / 8; - List<GeneratedRunFileReader> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, - maxRecordSize, null); - assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize); + List<List<GeneratedRunFileReader>> maxSizes = + testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null); + for (List<GeneratedRunFileReader> maxSize : maxSizes) { + assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize); + } + } + + @Test + public void testAllManySmallRecords() throws HyracksDataException { + int pageSize = 10240; + int frameLimit = 4; + int numRuns = 2; + int minRecordSize = pageSize / 8; + int maxRecordSize = pageSize / 8; + List<List<GeneratedRunFileReader>> maxSizes = + testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null); + for (List<GeneratedRunFileReader> maxSize : maxSizes) { + assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize); + } } @Test @@ -238,9 +262,11 @@ public abstract class AbstractRunGeneratorTest { int numRuns = 2; int minRecordSize = pageSize; int maxRecordSize = (int) (pageSize * 1.8); - List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, - null); - assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2); + List<List<GeneratedRunFileReader>> maxSizes = + testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null); + for (List<GeneratedRunFileReader> maxSize : maxSizes) { + assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize * 2); + } } @Test @@ -250,15 +276,16 @@ public abstract class AbstractRunGeneratorTest { int numRuns = 4; int minRecordSize = 20; int maxRecordSize = pageSize / 2; - HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1); - List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, - specialPair); - - int max = 0; - for (GeneratedRunFileReader run : size) { - max = Math.max(max, run.getMaxFrameSize()); + HashMap<Integer, String> specialPair = generateBigObject(pageSize / 2, frameLimit - 1); + List<List<GeneratedRunFileReader>> sizes = + testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair); + for (List<GeneratedRunFileReader> size : sizes) { + int max = 0; + for (GeneratedRunFileReader run : size) { + max = Math.max(max, run.getMaxFrameSize()); + } + assertTrue(max <= pageSize * (frameLimit - 1) && max >= pageSize * 2); } - assertTrue(max == pageSize * (frameLimit - 1)); } @Test(expected = HyracksDataException.class) @@ -269,8 +296,6 @@ public abstract class AbstractRunGeneratorTest { HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit); int minRecordSize = 10; int maxRecordSize = pageSize / 2; - List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, - specialPair); - + testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java index fd57f1e..6765d1e 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java @@ -20,7 +20,11 @@ package org.apache.hyracks.tests.unit; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory; +import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory; +import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy; import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator; import org.apache.hyracks.dataflow.std.sort.Algorithm; import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator; @@ -28,9 +32,20 @@ import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator; public class ExternalSortRunGeneratorTest extends AbstractRunGeneratorTest { @Override - AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord) + AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord) throws HyracksDataException { - return new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories, RecordDesc, - Algorithm.MERGE_SORT, frameLimit); + ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories, + RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE); + ExternalSortRunGenerator runGeneratorWithOneNormalizeKey = new ExternalSortRunGenerator(ctx, SortFields, + new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories, + RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE); + ExternalSortRunGenerator runGeneratorWithNormalizeKeys = new ExternalSortRunGenerator(ctx, SortFields, + new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory(), + new UTF8StringNormalizedKeyComputerFactory() }, + ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, + Integer.MAX_VALUE); + + return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizeKey, + runGeneratorWithNormalizeKeys }; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java index d219a56..5d9e771 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java @@ -19,23 +19,37 @@ package org.apache.hyracks.tests.unit; -import org.junit.Test; - import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory; +import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory; import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator; import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator; +import org.junit.Test; public class HeapSortRunGeneratorTest extends AbstractRunGeneratorTest { @Override - AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord) + AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord) throws HyracksDataException { - return new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories, - RecordDesc); + HeapSortRunGenerator runGenerator = new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, + null, ComparatorFactories, RecordDesc); + HeapSortRunGenerator runGeneratorWithOneNormalizedKey = + new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, + new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, + ComparatorFactories, RecordDesc); + HeapSortRunGenerator runGeneratorWithNormalizedKeys = new HeapSortRunGenerator(ctx, frameLimit, + numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] { + new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() }, + ComparatorFactories, RecordDesc); + + return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey, + runGeneratorWithNormalizedKeys }; + } @Test - public void testTopK(){ + public void testTopK() { } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java deleted file mode 100644 index d1080f8..0000000 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hyracks.tests.unit; - -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator; -import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator; - -public class HybridSortRunGenerator extends AbstractRunGeneratorTest { - @Override - AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord) - throws HyracksDataException { - return new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories, - RecordDesc); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java new file mode 100644 index 0000000..d91f1e1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.tests.unit; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory; +import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory; +import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator; +import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator; + +public class HybridSortRunGeneratorTest extends AbstractRunGeneratorTest { + @Override + AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord) + throws HyracksDataException { + HybridTopKSortRunGenerator runGenerator = new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, + SortFields, null, ComparatorFactories, RecordDesc); + HybridTopKSortRunGenerator runGeneratorWithOneNormalizedKey = + new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, + new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, + ComparatorFactories, RecordDesc); + HybridTopKSortRunGenerator runGeneratorWithNormalizedKeys = new HybridTopKSortRunGenerator(ctx, frameLimit, + numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] { + new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() }, + ComparatorFactories, RecordDesc); + + return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey, + runGeneratorWithNormalizedKeys }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java index c68d59d..a219518 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java @@ -19,16 +19,8 @@ package org.apache.hyracks.tests.unit; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.GRandom; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.generateRandomRecord; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.matchResult; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.prepareData; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.testUtils; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*; +import static org.junit.Assert.*; import java.io.DataInputStream; import java.util.ArrayList; @@ -71,7 +63,7 @@ public class RunMergingFrameReaderTest { private final int numFrames; private final int minRecordSize; private final int maxRecordSize; - private TreeMap<Integer, String> result = new TreeMap<>(); + private final TreeMap<Integer, String> result = new TreeMap<>(); int maxFrameSize; ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount()); @@ -186,8 +178,8 @@ public class RunMergingFrameReaderTest { prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList); - RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, - null, RecordDesc); + RunMergingFrameReader reader = + new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc); testMergeSucceed(ctx, reader, keyValueMapList); } @@ -207,8 +199,8 @@ public class RunMergingFrameReaderTest { prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList); - RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, - null, RecordDesc); + RunMergingFrameReader reader = + new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc); testMergeSucceed(ctx, reader, keyValueMapList); } @@ -291,8 +283,8 @@ public class RunMergingFrameReaderTest { prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList, frameList, keyValueMap); - RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, - null, RecordDesc); + RunMergingFrameReader reader = + new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc); testMergeSucceed(ctx, reader, keyValueMap); } @@ -342,8 +334,8 @@ public class RunMergingFrameReaderTest { for (GeneratedRunFileReader run : runGenerator.getRuns()) { runs.add(run); } - RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null, - RecordDesc); + RunMergingFrameReader reader = + new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null, RecordDesc); IFrame outFrame = new VSizeFrame(ctx); reader.open(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java index f621bf9..b2a8323 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java @@ -19,13 +19,8 @@ package org.apache.hyracks.tests.unit; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SerDers; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields; -import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.assertFTADataIsSorted; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*; +import static org.junit.Assert.*; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -38,10 +33,13 @@ import org.apache.hyracks.api.comm.FixedSizeFrame; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory; +import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory; import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator; import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator; import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator; @@ -126,12 +124,24 @@ public class TopKRunGeneratorTest { } @Test - public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() { - int topK = 1; + public void testHybridTopKWithOneNormalizedKey() throws HyracksDataException { + int topK = SORT_FRAME_LIMIT; IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE); - AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null, - ComparatorFactories, RecordDesc); + AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, + new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories, + RecordDesc); + testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter); + } + @Test + public void testHybridTopKWithTwoNormalizedKeys() throws HyracksDataException { + int topK = SORT_FRAME_LIMIT; + IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE); + AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator( + ctx, SORT_FRAME_LIMIT, topK, SortFields, new INormalizedKeyComputerFactory[] { + new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() }, + ComparatorFactories, RecordDesc); + testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter); } private void testInMemoryOnly(IHyracksTaskContext ctx, int topK, ORDER order, AbstractSortRunGenerator sorter) @@ -148,7 +158,7 @@ public class TopKRunGeneratorTest { List<IFrame> frameList = new ArrayList<>(); int minDataSize = PAGE_SIZE * NUM_PAGES * 4 / 5; - int minRecordSize = 16; + int minRecordSize = 64; int maxRecordSize = 64; AbstractRunGeneratorTest.prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null, @@ -162,7 +172,6 @@ public class TopKRunGeneratorTest { doSort(sorter, buffer); - assertEquals(0, sorter.getRuns().size()); validateResult(sorter, topKAnswer); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java index 650c60d..23a6be0 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.io.FileSplit; @@ -139,11 +140,11 @@ public class WordCountMain { JobSpecification spec = new JobSpecification(frameSize); IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits); - RecordDescriptor wordDesc = new RecordDescriptor( - new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); + RecordDescriptor wordDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); - FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider, - new WordTupleParserFactory(), wordDesc); + FileScanOperatorDescriptor wordScanner = + new FileScanOperatorDescriptor(spec, splitsProvider, new WordTupleParserFactory(), wordDesc); createPartitionConstraint(spec, wordScanner, inSplits); RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] { @@ -170,13 +171,16 @@ public class WordCountMain { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(scanGroupConn, wordScanner, 0, gBy, 0); } else { - IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; - IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo) - ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs, - wordDesc) - : new ExternalSortOperatorDescriptor(spec, frameLimit, keys, - new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc); + IBinaryComparatorFactory[] cfs = + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; + IOperatorDescriptor sorter = + "memsort".equalsIgnoreCase(algo) + ? new InMemorySortOperatorDescriptor(spec, keys, + new INormalizedKeyComputerFactory[] { + new UTF8StringNormalizedKeyComputerFactory() }, + cfs, wordDesc) + : new ExternalSortOperatorDescriptor(spec, frameLimit, keys, + new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc); createPartitionConstraint(spec, sorter, outSplits); IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec, @@ -195,9 +199,9 @@ public class WordCountMain { } IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits); - IOperatorDescriptor writer = "text".equalsIgnoreCase(format) - ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",") - : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider); + IOperatorDescriptor writer = + "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",") + : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider); createPartitionConstraint(spec, writer, outSplits); IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java index 8ab0708..7e56004 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java @@ -19,10 +19,7 @@ package org.apache.hyracks.examples.tpch.client; -import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint; -import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories; -import static org.apache.hyracks.examples.tpch.client.Common.ordersDesc; -import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits; +import static org.apache.hyracks.examples.tpch.client.Common.*; import java.util.EnumSet; @@ -31,6 +28,7 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; @@ -131,12 +129,12 @@ public class Sort { createPartitionConstraint(spec, ordScanner, ordersSplits); AbstractSorterOperatorDescriptor sorter; if (usingHeapSorter && limit < Integer.MAX_VALUE) { - sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, null, - SortFieldsComparatorFactories, ordersDesc); + sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, + (INormalizedKeyComputerFactory) null, SortFieldsComparatorFactories, ordersDesc); } else { if (memBufferAlg.equalsIgnoreCase("bestfit")) { - sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, - null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, + sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null, + SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.SMALLEST_FIT, limit); } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) { sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
