This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit b893514525f94e9a7688efc16ad35bf2bd6d5f95 Author: Ali Alsuliman <[email protected]> AuthorDate: Thu Jan 9 10:54:25 2020 -0800 [ASTERIXDB-2688][HYR] Fix use of a Hyracks task across join stages - user model changes: no - storage format changes: no - interface changes: yes Details: In hash join, a task from the build stage is being used in the probe stage. This is a problem since such tasks have already finished and notified the CC they are done. One observed issue is related to issuing a warning in the probe phase where some warnings are not reported because they are issued to tasks that have finished (the way this happened is that a comparator was created in the build phase using the build-phase task. Then, this comparator was used in the probe phase and issued a warning). - make IHyracksJobletContext extend IHyracksCommonContext so that it is also a frame manager context - make activites of join operators use the joblet context instead of the task context for acquiring buffers - create the probe-to-build comparator in the probe phase so that the right task is used in the comparator Change-Id: I38a4a779b9620494f15606162f0f1e9487fd0984 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4563 Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Contrib: Michael Blow <[email protected]> --- .../hyracks/api/context/IHyracksJobletContext.java | 2 +- .../java/org/apache/hyracks/control/nc/Joblet.java | 18 +++++--- .../java/org/apache/hyracks/control/nc/Task.java | 6 +-- .../dataflow/std/join/InMemoryHashJoin.java | 43 ++++++++++--------- .../join/InMemoryHashJoinOperatorDescriptor.java | 49 +++++++++------------- .../hyracks/dataflow/std/join/NestedLoopJoin.java | 45 ++++++++++---------- .../std/join/NestedLoopJoinOperatorDescriptor.java | 15 ++++--- .../dataflow/std/join/OptimizedHybridHashJoin.java | 43 ++++++++++--------- .../OptimizedHybridHashJoinOperatorDescriptor.java | 44 +++++++++---------- .../integration/OptimizedHybridHashJoinTest.java | 10 ++--- .../hyracks/test/support/TestJobletContext.java | 29 +++++++------ .../hyracks/test/support/TestTaskContext.java | 4 +- 12 files changed, 155 insertions(+), 153 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java index 7a59926..41fed25 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java @@ -26,7 +26,7 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatableRegistry; -public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry { +public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry, IHyracksCommonContext { INCServiceContext getServiceContext(); JobId getJobId(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index 35cf57f..02cd184 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -240,11 +240,13 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { nodeController.getExecutor().execute(() -> deallocatableRegistry.close()); } - ByteBuffer allocateFrame() throws HyracksDataException { + @Override + public ByteBuffer allocateFrame() throws HyracksDataException { return frameManager.allocateFrame(); } - ByteBuffer allocateFrame(int bytes) throws HyracksDataException { + @Override + public ByteBuffer allocateFrame(int bytes) throws HyracksDataException { if (serviceCtx.getMemoryManager().allocate(bytes)) { memoryAllocation.addAndGet(bytes); return frameManager.allocateFrame(bytes); @@ -252,18 +254,21 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { throw new HyracksDataException("Unable to allocate frame: Not enough memory"); } - ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData) + @Override + public ByteBuffer reallocateFrame(ByteBuffer usedBuffer, int newFrameSizeInBytes, boolean copyOldData) throws HyracksDataException { return frameManager.reallocateFrame(usedBuffer, newFrameSizeInBytes, copyOldData); } - void deallocateFrames(int bytes) { + @Override + public void deallocateFrames(int bytes) { memoryAllocation.addAndGet(bytes); serviceCtx.getMemoryManager().deallocate(bytes); frameManager.deallocateFrames(bytes); } - public final int getFrameSize() { + @Override + public final int getInitialFrameSize() { return frameManager.getInitialFrameSize(); } @@ -271,7 +276,8 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { return maxWarnings; } - public IIOManager getIOManager() { + @Override + public IIOManager getIoManager() { return serviceCtx.getIoManager(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index e58e4a4..158e24e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -139,7 +139,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { this.taskAttemptId = taskId; this.displayName = displayName; this.executorService = executor; - fileFactory = new WorkspaceFileFactory(this, joblet.getIOManager()); + fileFactory = new WorkspaceFileFactory(this, joblet.getIoManager()); deallocatableRegistry = new DefaultDeallocatableRegistry(); counterMap = new HashMap<>(); opEnv = joblet.getEnvironment(); @@ -181,12 +181,12 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { @Override public int getInitialFrameSize() { - return joblet.getFrameSize(); + return joblet.getInitialFrameSize(); } @Override public IIOManager getIoManager() { - return joblet.getIOManager(); + return joblet.getIoManager(); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java index eadbcf7..fb2d4e9 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.context.IHyracksFrameMgrContext; import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; @@ -49,35 +49,35 @@ public class InMemoryHashJoin { private final List<ByteBuffer> buffers; private final FrameTupleAccessor accessorBuild; private final ITuplePartitionComputer tpcBuild; - private IFrameTupleAccessor accessorProbe; + private final IFrameTupleAccessor accessorProbe; private final ITuplePartitionComputer tpcProbe; private final FrameTupleAppender appender; - private final ITuplePairComparator tpComparator; + private ITuplePairComparator tpComparator; private final boolean isLeftOuter; private final ArrayTupleBuilder missingTupleBuild; private final ISerializableTable table; private final TuplePointer storedTuplePointer; private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output private final IPredicateEvaluator predEvaluator; - private TupleInFrameListAccessor tupleAccessor; + private final TupleInFrameListAccessor tupleAccessor; // To release frames - ISimpleFrameBufferManager bufferManager; + private final ISimpleFrameBufferManager bufferManager; private final boolean isTableCapacityNotZero; private static final Logger LOGGER = LogManager.getLogger(); - public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe, - FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild, - ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild, + public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe, + ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, + ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild, ISerializableTable table, IPredicateEvaluator predEval, ISimpleFrameBufferManager bufferManager) throws HyracksDataException { - this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, comparator, isLeftOuter, - missingWritersBuild, table, predEval, false, bufferManager); + this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, isLeftOuter, missingWritersBuild, table, + predEval, false, bufferManager); } - public InMemoryHashJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe, - FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild, - ITuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild, + public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe, + ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild, + ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild, ISerializableTable table, IPredicateEvaluator predEval, boolean reverse, ISimpleFrameBufferManager bufferManager) throws HyracksDataException { this.table = table; @@ -88,7 +88,6 @@ public class InMemoryHashJoin { this.accessorProbe = accessorProbe; this.tpcProbe = tpcProbe; appender = new FrameTupleAppender(new VSizeFrame(ctx)); - tpComparator = comparator; predEvaluator = predEval; this.isLeftOuter = isLeftOuter; if (isLeftOuter) { @@ -105,11 +104,7 @@ public class InMemoryHashJoin { reverseOutputOrder = reverse; this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers); this.bufferManager = bufferManager; - if (table.getTableSize() != 0) { - isTableCapacityNotZero = true; - } else { - isTableCapacityNotZero = false; - } + this.isTableCapacityNotZero = table.getTableSize() != 0; if (LOGGER.isTraceEnabled()) { LOGGER.trace("InMemoryHashJoin has been created for a table size of " + table.getTableSize() + " for Thread ID " + Thread.currentThread().getId() + "."); @@ -126,6 +121,7 @@ public class InMemoryHashJoin { storedTuplePointer.reset(bIndex, i); // If an insertion fails, then tries to insert the same tuple pointer again after compacting the table. if (!table.insert(entry, storedTuplePointer)) { + // TODO(ali): should check if insertion failed even after compaction and take action compactTableAndInsertAgain(entry, storedTuplePointer); } } @@ -152,6 +148,15 @@ public class InMemoryHashJoin { } /** + * Must be called before starting to join to set the right comparator with the right context. + * + * @param comparator the comparator to use for comparing the probe tuples against the build tuples + */ + void setComparator(ITuplePairComparator comparator) { + tpComparator = comparator; + } + + /** * Reads the given tuple from the probe side and joins it with tuples from the build side. * This method assumes that the accessorProbe is already set to the current probe frame. */ diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java index ccca62d..33976a8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; @@ -34,7 +35,6 @@ import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -74,18 +74,8 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1, ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) { - super(spec, 2, 1); - this.keys0 = keys0; - this.keys1 = keys1; - this.hashFunctionFactories0 = hashFunctionFactories0; - this.hashFunctionFactories1 = hashFunctionFactories1; - this.comparatorFactory = comparatorFactory; - this.predEvaluatorFactory = predEvalFactory; - outRecDescs[0] = recordDescriptor; - this.isLeftOuter = false; - this.nonMatchWriterFactories = null; - this.tableSize = tableSize; - this.memSizeInFrames = memSizeInFrames; + this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, predEvalFactory, + recordDescriptor, false, null, tableSize, memSizeInFrames); } public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1, @@ -100,7 +90,7 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript this.hashFunctionFactories1 = hashFunctionFactories1; this.comparatorFactory = comparatorFactory; this.predEvaluatorFactory = predEvalFactory; - outRecDescs[0] = recordDescriptor; + this.outRecDescs[0] = recordDescriptor; this.isLeftOuter = isLeftOuter; this.nonMatchWriterFactories = missingWriterFactories1; this.tableSize = tableSize; @@ -125,11 +115,8 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript builder.addBlockingEdge(hba, hpa); } - public static class HashBuildTaskState extends AbstractStateObject { - private InMemoryHashJoin joiner; - - public HashBuildTaskState() { - } + static class HashBuildTaskState extends AbstractStateObject { + InMemoryHashJoin joiner; private HashBuildTaskState(JobId jobId, TaskId taskId) { super(jobId, taskId); @@ -160,21 +147,23 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) throws HyracksDataException { + final IHyracksJobletContext jobletCtx = ctx.getJobletContext(); final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(hpaId, 0); final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx); - final IMissingWriter[] nullWriters1 = - isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null; + final IMissingWriter[] nullWriters1; if (isLeftOuter) { + nullWriters1 = new IMissingWriter[nonMatchWriterFactories.length]; for (int i = 0; i < nonMatchWriterFactories.length; i++) { nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter(); } + } else { + nullWriters1 = null; } final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator()); - final int memSizeInBytes = memSizeInFrames * ctx.getInitialFrameSize(); - final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, memSizeInBytes); + final int memSizeInBytes = memSizeInFrames * jobletCtx.getInitialFrameSize(); + final IDeallocatableFramePool framePool = new DeallocatableFramePool(jobletCtx, memSizeInBytes); final ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool); IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() { @@ -186,12 +175,11 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories0).createPartitioner(ctx); ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories1).createPartitioner(ctx); - state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), - new TaskId(getActivityId(), partition)); - ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager); - state.joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(rd0), hpc0, - new FrameTupleAccessor(rd1), rd1, hpc1, comparator, isLeftOuter, nullWriters1, table, - predEvaluator, bufferManager); + state = new HashBuildTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition)); + ISerializableTable table = new SerializableHashTable(tableSize, jobletCtx, bufferManager); + state.joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(rd0), hpc0, + new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, predEvaluator, + bufferManager); } @Override @@ -250,6 +238,7 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript writer.open(); state = (HashBuildTaskState) ctx .getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition)); + state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx)); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java index d0f5a73..361d1ee 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; @@ -46,7 +46,7 @@ public class NestedLoopJoin { private final FrameTupleAccessor accessorInner; private final FrameTupleAccessor accessorOuter; private final FrameTupleAppender appender; - private final ITuplePairComparator tpComparator; + private ITuplePairComparator tpComparator; private final IFrame outBuffer; private final IFrame innerBuffer; private final VariableFrameMemoryManager outerBufferMngr; @@ -55,24 +55,23 @@ public class NestedLoopJoin { private final ArrayTupleBuilder missingTupleBuilder; private final IPredicateEvaluator predEvaluator; private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal - private BufferInfo tempInfo = new BufferInfo(null, -1, -1); + private final BufferInfo tempInfo = new BufferInfo(null, -1, -1); - public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner, - ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, + public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter, + FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriter[] missingWriters) throws HyracksDataException { this.accessorInner = accessorInner; this.accessorOuter = accessorOuter; this.appender = new FrameTupleAppender(); - this.tpComparator = comparatorsOuter2Inner; - this.outBuffer = new VSizeFrame(ctx); - this.innerBuffer = new VSizeFrame(ctx); + this.outBuffer = new VSizeFrame(jobletContext); + this.innerBuffer = new VSizeFrame(jobletContext); this.appender.reset(outBuffer, true); if (memSize < 3) { throw new HyracksDataException("Not enough memory is available for Nested Loop Join"); } - this.outerBufferMngr = - new VariableFrameMemoryManager(new VariableFramePool(ctx, ctx.getInitialFrameSize() * (memSize - 2)), - FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2)); + this.outerBufferMngr = new VariableFrameMemoryManager( + new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize() * (memSize - 2)), + FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2)); this.predEvaluator = predEval; this.isReversed = false; @@ -91,8 +90,8 @@ public class NestedLoopJoin { } FileReference file = - ctx.getJobletContext().createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString()); - runFileWriter = new RunFileWriter(file, ctx.getIoManager()); + jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString()); + runFileWriter = new RunFileWriter(file, jobletContext.getIoManager()); runFileWriter.open(); } @@ -100,6 +99,15 @@ public class NestedLoopJoin { runFileWriter.nextFrame(buffer); } + /** + * Must be called before starting to join to set the right comparator with the right context. + * + * @param comparator the comparator to use for comparing the probe tuples against the build tuples + */ + void setComparator(ITuplePairComparator comparator) { + tpComparator = comparator; + } + public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException { if (outerBufferMngr.insertFrame(outerBuffer) < 0) { RunFileReader runFileReader = runFileWriter.createReader(); @@ -131,7 +139,7 @@ public class NestedLoopJoin { for (int i = 0; i < tupleCount0; ++i) { boolean matchFound = false; for (int j = 0; j < tupleCount1; ++j) { - int c = compare(accessorOuter, i, accessorInner, j); + int c = tpComparator.compare(accessorOuter, i, accessorInner, j); boolean prdEval = evaluatePredicate(i, j); if (c == 0 && prdEval) { matchFound = true; @@ -195,15 +203,6 @@ public class NestedLoopJoin { outerBufferMngr.reset(); } - private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1) - throws HyracksDataException { - int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1); - if (c != 0) { - return c; - } - return 0; - } - public void setIsReversed(boolean b) { this.isReversed = b; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java index 2236056..1de8094 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java @@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.join; import java.nio.ByteBuffer; +import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; @@ -31,7 +32,6 @@ import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -114,9 +114,9 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) throws HyracksDataException { + final IHyracksJobletContext jobletCtx = ctx.getJobletContext(); final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0); final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); - final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx); final IPredicateEvaluator predEvaluator = (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null; @@ -132,17 +132,15 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public void open() throws HyracksDataException { - state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), - new TaskId(getActivityId(), partition)); - - state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1), - comparator, memSize, predEvaluator, isLeftOuter, nullWriters1); + state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition)); + state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0), + new FrameTupleAccessor(rd1), memSize, predEvaluator, isLeftOuter, nullWriters1); } @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity()); + ByteBuffer copyBuffer = jobletCtx.allocateFrame(buffer.capacity()); FrameUtils.copyAndFlip(buffer, copyBuffer); state.joiner.cache(copyBuffer); } @@ -180,6 +178,7 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor writer.open(); state = (JoinCacheTaskState) ctx.getStateObject( new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition)); + state.joiner.setComparator(comparatorFactory.createTuplePairComparator(ctx)); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index 7b6dcdb..9c28c61 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -24,7 +24,7 @@ import java.util.BitSet; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; @@ -62,11 +62,10 @@ public class OptimizedHybridHashJoin { PROBE } - private final IHyracksTaskContext ctx; + private final IHyracksJobletContext jobletCtx; private final String buildRelName; private final String probeRelName; - private final ITuplePairComparator comparator; private final ITuplePartitionComputer buildHpc; private final ITuplePartitionComputer probeHpc; private final RecordDescriptor buildRd; @@ -95,17 +94,16 @@ public class OptimizedHybridHashJoin { private final TuplePointer tempPtr = new TuplePointer(); private int[] probePSizeInTups; - public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions, - String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd, - RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, - IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) { - this.ctx = ctx; + public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int memSizeInFrames, int numOfPartitions, + String probeRelName, String buildRelName, RecordDescriptor probeRd, RecordDescriptor buildRd, + ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, + boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) { + this.jobletCtx = jobletCtx; this.memSizeInFrames = memSizeInFrames; this.buildRd = buildRd; this.probeRd = probeRd; this.buildHpc = buildHpc; this.probeHpc = probeHpc; - this.comparator = comparator; this.buildRelName = buildRelName; this.probeRelName = probeRelName; this.numOfPartitions = numOfPartitions; @@ -127,7 +125,7 @@ public class OptimizedHybridHashJoin { public void initBuild() throws HyracksDataException { IDeallocatableFramePool framePool = - new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize()); + new DeallocatableFramePool(jobletCtx, memSizeInFrames * jobletCtx.getInitialFrameSize()); bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool); bufferManager = new VPartitionTupleBufferManager( PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus), @@ -177,8 +175,8 @@ public class OptimizedHybridHashJoin { int pid) throws HyracksDataException { RunFileWriter writer = runFileWriters[pid]; if (writer == null) { - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName); - writer = new RunFileWriter(file, ctx.getIoManager()); + FileReference file = jobletCtx.createManagedWorkspaceFile(refName); + writer = new RunFileWriter(file, jobletCtx.getIoManager()); writer.open(); runFileWriters[pid] = writer; } @@ -194,10 +192,10 @@ public class OptimizedHybridHashJoin { // and tries to bring back as many spilled partitions as possible if there is free space. int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions(); - ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable); - this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc, - new FrameTupleAccessor(buildRd), buildRd, buildHpc, comparator, isLeftOuter, nonMatchWriters, table, - predEvaluator, isReversed, bufferManagerForHashTable); + ISerializableTable table = new SerializableHashTable(inMemTupCount, jobletCtx, bufferManagerForHashTable); + this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRd), probeHpc, + new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, predEvaluator, + isReversed, bufferManagerForHashTable); buildHashTable(); } @@ -250,7 +248,7 @@ public class OptimizedHybridHashJoin { * @throws HyracksDataException */ private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException { - int frameSize = ctx.getInitialFrameSize(); + int frameSize = jobletCtx.getInitialFrameSize(); long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize; int inMemTupCount = 0; @@ -356,7 +354,7 @@ public class OptimizedHybridHashJoin { * @return partition id of selected partition to reload */ private int selectAPartitionToReload(long freeSpace, int pid, int inMemTupCount) { - int frameSize = ctx.getInitialFrameSize(); + int frameSize = jobletCtx.getInitialFrameSize(); // Add one frame to freeSpace to consider the one frame reserved for the spilled partition long totalFreeSpace = freeSpace + frameSize; if (totalFreeSpace > 0) { @@ -379,7 +377,7 @@ public class OptimizedHybridHashJoin { try { r.open(); if (reloadBuffer == null) { - reloadBuffer = new VSizeFrame(ctx); + reloadBuffer = new VSizeFrame(jobletCtx); } while (r.nextFrame(reloadBuffer)) { accessorBuild.reset(reloadBuffer.getBuffer()); @@ -430,8 +428,9 @@ public class OptimizedHybridHashJoin { } } - public void initProbe() { + public void initProbe(ITuplePairComparator comparator) { probePSizeInTups = new int[numOfPartitions]; + inMemJoiner.setComparator(comparator); } public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException { @@ -464,7 +463,7 @@ public class OptimizedHybridHashJoin { VPartitionTupleBufferManager.calculateActualSize(null, accessorProbe.getTupleLength(tupleId)); // If the partition is at least half-full and insertion fails, that partition is preferred to get // spilled, otherwise the biggest partition gets chosen as the victim. - boolean modestCase = recordSize <= (ctx.getInitialFrameSize() / 2); + boolean modestCase = recordSize <= (jobletCtx.getInitialFrameSize() / 2); int victim = (modestCase && bufferManager.getNumTuples(pid) > 0) ? pid : spillPolicy.findSpilledPartitionWithMaxMemoryUsage(); // This method is called for the spilled partitions, so we know that this tuple is going to get written to @@ -492,7 +491,7 @@ public class OptimizedHybridHashJoin { private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i) throws HyracksDataException { if (bigProbeFrameAppender == null) { - bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx)); + bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx)); } RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(probeRFWriters, probeRelName, pid); if (!bigProbeFrameAppender.append(accessorProbe, i)) { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 45cccec..1819b8d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -26,6 +26,7 @@ import java.util.BitSet; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; @@ -190,7 +191,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD } //memorySize is the memory for join (we have already excluded the 2 buffers for in/out) - private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions) + private static int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions) throws HyracksDataException { int numberOfPartitions = 0; if (memorySize <= 2) { @@ -260,8 +261,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0); - final ITuplePairComparator probComparator = - tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx); final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator()); @@ -284,9 +283,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD state.memForJoin = memSizeInFrames - 2; state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions); - state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions, - PROBE_REL, BUILD_REL, probComparator, probeRd, buildRd, probeHpc, buildHpc, predEvaluator, - isLeftOuter, nonMatchWriterFactories); + state.hybridHJ = new OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin, + state.numOfPartitions, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc, + predEvaluator, isLeftOuter, nonMatchWriterFactories); state.hybridHJ.initBuild(); if (LOGGER.isTraceEnabled()) { @@ -373,8 +372,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD } IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() { + private final IHyracksJobletContext jobletCtx = ctx.getJobletContext(); private BuildAndPartitionTaskState state; - private IFrame rPartbuff = new VSizeFrame(ctx); + private IFrame rPartbuff = new VSizeFrame(jobletCtx); private FrameTupleAppender nullResultAppender = null; private FrameTupleAccessor probeTupleAccessor; @@ -386,7 +386,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition)); writer.open(); - state.hybridHJ.initProbe(); + state.hybridHJ.initProbe(probComp); if (LOGGER.isDebugEnabled()) { LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase."); @@ -480,7 +480,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories) .createPartitioner(level); - int frameSize = ctx.getInitialFrameSize(); + int frameSize = jobletCtx.getInitialFrameSize(); long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize); long probePartSize = (long) Math.ceil((double) probeSideReader.getFileSize() / (double) frameSize); int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple); @@ -575,7 +575,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; OptimizedHybridHashJoin rHHj; int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions); - rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, comp, probeRd, + rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); //checked-confirmed rHHj.setIsReversed(isReversed); @@ -598,7 +598,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD probeSideReader.open(); rPartbuff.reset(); try { - rHHj.initProbe(); + rHHj.initProbe(comp); while (probeSideReader.nextFrame(rPartbuff)) { rHHj.probe(rPartbuff.getBuffer(), writer); } @@ -696,7 +696,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException { if (nullResultAppender == null) { - nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx)); + nullResultAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx)); } if (probeTupleAccessor == null) { probeTupleAccessor = new FrameTupleAccessor(probeRd); @@ -725,14 +725,14 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys; assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; IDeallocatableFramePool framePool = - new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize()); + new DeallocatableFramePool(jobletCtx, state.memForJoin * jobletCtx.getInitialFrameSize()); ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool); - ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager); - InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRDesc), hpcRepProbe, - new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, comp, isLeftOuter, + ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager); + InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc), + hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter, nonMatchWriter, table, predEvaluator, isReversed, bufferManager); - + joiner.setComparator(comp); try { bReader.open(); rPartbuff.reset(); @@ -788,12 +788,12 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD boolean isReversed = outerRd == buildRd && innerRd == probeRd; assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp; - NestedLoopJoin nlj = - new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd), - nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter); + NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd), + new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter); nlj.setIsReversed(isReversed); + nlj.setComparator(nljComptorOuterInner); - IFrame cacheBuff = new VSizeFrame(ctx); + IFrame cacheBuff = new VSizeFrame(jobletCtx); try { innerReader.open(); while (innerReader.nextFrame(cacheBuff)) { @@ -808,7 +808,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD } } try { - IFrame joinBuff = new VSizeFrame(ctx); + IFrame joinBuff = new VSizeFrame(jobletCtx); outerReader.open(); try { while (outerReader.nextFrame(joinBuff)) { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java index 4c6b70f..93739df 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java @@ -24,7 +24,7 @@ import java.util.Random; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; @@ -50,7 +50,7 @@ import org.mockito.Mockito; public class OptimizedHybridHashJoinTest { int frameSize = 32768; int totalNumberOfFrames = 10; - IHyracksTaskContext ctx = TestUtils.create(frameSize); + IHyracksJobletContext ctx = TestUtils.create(frameSize).getJobletContext(); OptimizedHybridHashJoin hhj; static IBinaryHashFunctionFamily[] propHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE }; static IBinaryHashFunctionFamily[] buildHashFunctionFactories = { MurmurHash3BinaryHashFunctionFamily.INSTANCE }; @@ -150,8 +150,8 @@ public class OptimizedHybridHashJoinTest { private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException { - hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, comparator, - probeRd, buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null); + hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, probeRd, + buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null); hhj.initBuild(); @@ -184,7 +184,7 @@ public class OptimizedHybridHashJoinTest { //to the in memory joiner. As such, only next frame is important. } }; - hhj.initProbe(); + hhj.initProbe(comparator); for (int i = 0; i < totalNumberOfFrames; i++) { hhj.probe(frame.getBuffer(), writer); checkOneFrameReservedPerSpilledPartitions(); diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java index 2809343..c485b32 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java @@ -34,31 +34,33 @@ import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; import org.apache.hyracks.control.nc.resources.memory.FrameManager; public class TestJobletContext implements IHyracksJobletContext { - private final int frameSize; + private final INCServiceContext serviceContext; private final FrameManager frameManger; - private JobId jobId; - private WorkspaceFileFactory fileFactory; + private final JobId jobId; + private final WorkspaceFileFactory fileFactory; private final long jobStartTime; - public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException { - this.frameSize = frameSize; + TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException { this.serviceContext = serviceContext; this.jobId = jobId; - fileFactory = new WorkspaceFileFactory(this, getIOManager()); + fileFactory = new WorkspaceFileFactory(this, getIoManager()); this.frameManger = new FrameManager(frameSize); this.jobStartTime = System.currentTimeMillis(); } - ByteBuffer allocateFrame() throws HyracksDataException { + @Override + public ByteBuffer allocateFrame() throws HyracksDataException { return frameManger.allocateFrame(); } + @Override public ByteBuffer allocateFrame(int bytes) throws HyracksDataException { return frameManger.allocateFrame(bytes); } - ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData) + @Override + public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newFrameSizeInBytes, boolean copyOldData) throws HyracksDataException { return frameManger.reallocateFrame(tobeDeallocate, newFrameSizeInBytes, copyOldData); } @@ -67,15 +69,18 @@ public class TestJobletContext implements IHyracksJobletContext { return null; } - void deallocateFrames(int bytes) { + @Override + public void deallocateFrames(int bytes) { frameManger.deallocateFrames(bytes); } - public int getFrameSize() { - return frameSize; + @Override + public final int getInitialFrameSize() { + return frameManger.getInitialFrameSize(); } - public IIOManager getIOManager() { + @Override + public IIOManager getIoManager() { return serviceContext.getIoManager(); } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index 7c602f5..dcb85f6 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -85,12 +85,12 @@ public class TestTaskContext implements IHyracksTaskContext { @Override public int getInitialFrameSize() { - return jobletContext.getFrameSize(); + return jobletContext.getInitialFrameSize(); } @Override public IIOManager getIoManager() { - return jobletContext.getIOManager(); + return jobletContext.getIoManager(); } @Override
