http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java index 69e9e6a..2de8e6c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java @@ -26,8 +26,8 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -51,7 +51,7 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato private final int[] keys1; private final IBinaryHashFunctionFactory[] hashFunctionFactories; private final IBinaryComparatorFactory[] comparatorFactories; - private final INullWriterFactory[] nullWriterFactories; + private final IMissingWriterFactory[] nonMatchWriterFactories; private final RecordDescriptor rd0; private final RecordDescriptor rd1; private final int recordsPerFrame; @@ -62,7 +62,7 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame, double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories, - IBinaryComparatorFactory[] comparatorFactories, INullWriterFactory[] nullWriterFactories, + IBinaryComparatorFactory[] comparatorFactories, IMissingWriterFactory[] nullWriterFactories, RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions, IPredicateEvaluator predEval, boolean isLeftOuter) { this.ctx = ctx; @@ -72,7 +72,7 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato this.keys1 = keys1; this.hashFunctionFactories = hashFunctionFactories; this.comparatorFactories = comparatorFactories; - this.nullWriterFactories = nullWriterFactories; + this.nonMatchWriterFactories = nullWriterFactories; this.rd0 = rd0; this.rd1 = rd1; this.numPartitions = numPartitions; @@ -98,10 +98,10 @@ class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperato ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions, new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner(); - final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories.length] : null; + final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null; if (isLeftOuter) { - for (int i = 0; i < nullWriterFactories.length; i++) { - nullWriters1[i] = nullWriterFactories[i].createNullWriter(); + for (int i = 0; i < nonMatchWriterFactories.length; i++) { + nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter(); } } try {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java index d0a81ee..4354367 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java @@ -33,8 +33,8 @@ import org.apache.hyracks.api.dataflow.TaskId; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -47,7 +47,6 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFactory; @@ -60,6 +59,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePu import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; import org.apache.hyracks.dataflow.std.structures.ISerializableTable; import org.apache.hyracks.dataflow.std.structures.SerializableHashTable; +import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor { private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0; @@ -76,7 +76,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor private final IBinaryComparatorFactory[] comparatorFactories; private final IPredicateEvaluatorFactory predEvaluatorFactory; private final boolean isLeftOuter; - private final INullWriterFactory[] nullWriterFactories1; + private final IMissingWriterFactory[] nonMatchWriterFactories1; /** * @param spec @@ -97,7 +97,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor int recordsPerFrame, double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, - INullWriterFactory[] nullWriterFactories1) throws HyracksDataException { + IMissingWriterFactory[] nullWriterFactories1) throws HyracksDataException { super(spec, 2, 1); this.memsize = memsize; this.inputsize0 = inputsize0; @@ -109,7 +109,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor this.comparatorFactories = comparatorFactories; this.predEvaluatorFactory = predEvalFactory; this.isLeftOuter = isLeftOuter; - this.nullWriterFactories1 = nullWriterFactories1; + this.nonMatchWriterFactories1 = nullWriterFactories1; recordDescriptors[0] = recordDescriptor; } @@ -169,17 +169,18 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) - throws HyracksDataException { + throws HyracksDataException { final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0); final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); } - final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null; + final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length] + : null; if (isLeftOuter) { - for (int i = 0; i < nullWriterFactories1.length; i++) { - nullWriters1[i] = nullWriterFactories1[i].createNullWriter(); + for (int i = 0; i < nonMatchWriterFactories1.length; i++) { + nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter(); } } final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null @@ -198,8 +199,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public void close() throws HyracksDataException { - if (state.memoryForHashtable != 0) + if (state.memoryForHashtable != 0) { build(inBuffer.getBuffer()); + } for (int i = 0; i < state.nPartitions; i++) { ByteBuffer buf = bufferForPartitions[i].getBuffer(); @@ -359,17 +361,18 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) - throws HyracksDataException { + throws HyracksDataException { final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0); final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); } - final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null; + final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories1.length] + : null; if (isLeftOuter) { - for (int i = 0; i < nullWriterFactories1.length; i++) { - nullWriters1[i] = nullWriterFactories1[i].createNullWriter(); + for (int i = 0; i < nonMatchWriterFactories1.length; i++) { + nullWriters1[i] = nonMatchWriterFactories1[i].createMissingWriter(); } } final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null @@ -436,8 +439,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor if (!ftap.append(accessorProbe, i)) { state.joiner.join(inBuffer.getBuffer(), writer); ftap.reset(inBuffer, true); - } else + } else { break; + } } } else { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java index fee7dd8..486821b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java @@ -28,7 +28,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.INullWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -51,7 +51,7 @@ public class InMemoryHashJoin { private final FrameTupleAppender appender; private final FrameTuplePairComparator tpComparator; private final boolean isLeftOuter; - private final ArrayTupleBuilder nullTupleBuild; + private final ArrayTupleBuilder missingTupleBuild; private final ISerializableTable table; private final int tableSize; private final TuplePointer storedTuplePointer; @@ -62,16 +62,16 @@ public class InMemoryHashJoin { public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild, - FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWritersBuild, + FrameTuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild, ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException { this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, tpcBuild, comparator, isLeftOuter, - nullWritersBuild, table, predEval, + missingWritersBuild, table, predEval, false); } public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe, ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild, - FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWritersBuild, + FrameTuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild, ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException { this.ctx = ctx; this.tableSize = tableSize; @@ -88,14 +88,14 @@ public class InMemoryHashJoin { this.isLeftOuter = isLeftOuter; if (isLeftOuter) { int fieldCountOuter = accessorBuild.getFieldCount(); - nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter); - DataOutput out = nullTupleBuild.getDataOutput(); + missingTupleBuild = new ArrayTupleBuilder(fieldCountOuter); + DataOutput out = missingTupleBuild.getDataOutput(); for (int i = 0; i < fieldCountOuter; i++) { - nullWritersBuild[i].writeNull(out); - nullTupleBuild.addFieldEndOffset(); + missingWritersBuild[i].writeMissing(out); + missingTupleBuild.addFieldEndOffset(); } } else { - nullTupleBuild = null; + missingTupleBuild = null; } reverseOutputOrder = reverse; LOGGER.fine("InMemoryHashJoin has been created for a table size of " + tableSize + " for Thread ID " @@ -140,8 +140,8 @@ public class InMemoryHashJoin { } if (!matchFound && isLeftOuter) { FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, tid, - nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, - nullTupleBuild.getSize()); + missingTupleBuild.getFieldEndOffsets(), missingTupleBuild.getByteArray(), 0, + missingTupleBuild.getSize()); } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java index be8d319..0d6d163 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java @@ -31,8 +31,8 @@ import org.apache.hyracks.api.dataflow.TaskId; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -42,7 +42,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; @@ -52,6 +51,7 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePu import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; import org.apache.hyracks.dataflow.std.structures.ISerializableTable; import org.apache.hyracks.dataflow.std.structures.SerializableHashTable; +import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator; public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -61,7 +61,7 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript private final IBinaryComparatorFactory[] comparatorFactories; private final IPredicateEvaluatorFactory predEvaluatorFactory; private final boolean isLeftOuter; - private final INullWriterFactory[] nullWriterFactories1; + private final IMissingWriterFactory[] nonMatchWriterFactories; private final int tableSize; public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1, @@ -75,14 +75,14 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript this.predEvaluatorFactory = predEvalFactory; recordDescriptors[0] = recordDescriptor; this.isLeftOuter = false; - this.nullWriterFactories1 = null; + this.nonMatchWriterFactories = null; this.tableSize = tableSize; } public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter, - INullWriterFactory[] nullWriterFactories1, int tableSize) { + IMissingWriterFactory[] missingWriterFactories1, int tableSize) { super(spec, 2, 1); this.keys0 = keys0; this.keys1 = keys1; @@ -91,7 +91,7 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript this.predEvaluatorFactory = predEvalFactory; recordDescriptors[0] = recordDescriptor; this.isLeftOuter = isLeftOuter; - this.nullWriterFactories1 = nullWriterFactories1; + this.nonMatchWriterFactories = missingWriterFactories1; this.tableSize = tableSize; } @@ -103,7 +103,7 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1, + RecordDescriptor recordDescriptor, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1, int tableSize) { this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, null, recordDescriptor, isLeftOuter, nullWriterFactories1, tableSize); @@ -167,10 +167,11 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); } - final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null; + final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] + : null; if (isLeftOuter) { - for (int i = 0; i < nullWriterFactories1.length; i++) { - nullWriters1[i] = nullWriterFactories1[i].createNullWriter(); + for (int i = 0; i < nonMatchWriterFactories.length; i++) { + nullWriters1[i] = nonMatchWriterFactories[i].createMissingWriter(); } } final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java index 2ad89cf..202aac6 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java @@ -25,7 +25,7 @@ import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.INullWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -53,14 +53,14 @@ public class NestedLoopJoin { private RunFileReader runFileReader; private final RunFileWriter runFileWriter; private final boolean isLeftOuter; - private final ArrayTupleBuilder nullTupleBuilder; + private final ArrayTupleBuilder missingTupleBuilder; private final IPredicateEvaluator predEvaluator; private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal private BufferInfo tempInfo = new BufferInfo(null, -1, -1); public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessorOuter, FrameTupleAccessor accessorInner, ITuplePairComparator comparatorsOuter2Inner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, - INullWriter[] nullWriters1) throws HyracksDataException { + IMissingWriter[] missingWriters) throws HyracksDataException { this.accessorInner = accessorInner; this.accessorOuter = accessorOuter; this.appender = new FrameTupleAppender(); @@ -81,14 +81,14 @@ public class NestedLoopJoin { this.isLeftOuter = isLeftOuter; if (isLeftOuter) { int innerFieldCount = this.accessorInner.getFieldCount(); - nullTupleBuilder = new ArrayTupleBuilder(innerFieldCount); - DataOutput out = nullTupleBuilder.getDataOutput(); + missingTupleBuilder = new ArrayTupleBuilder(innerFieldCount); + DataOutput out = missingTupleBuilder.getDataOutput(); for (int i = 0; i < innerFieldCount; i++) { - nullWriters1[i].writeNull(out); - nullTupleBuilder.addFieldEndOffset(); + missingWriters[i].writeMissing(out); + missingTupleBuilder.addFieldEndOffset(); } } else { - nullTupleBuilder = null; + missingTupleBuilder = null; } FileReference file = ctx.getJobletContext() @@ -138,9 +138,9 @@ public class NestedLoopJoin { } if (!matchFound && isLeftOuter) { - final int[] ntFieldEndOffsets = nullTupleBuilder.getFieldEndOffsets(); - final byte[] ntByteArray = nullTupleBuilder.getByteArray(); - final int ntSize = nullTupleBuilder.getSize(); + final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets(); + final byte[] ntByteArray = missingTupleBuilder.getByteArray(); + final int ntSize = missingTupleBuilder.getSize(); FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0, ntSize); } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java index dc7d256..09207b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java @@ -29,8 +29,8 @@ import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.TaskId; -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -57,18 +57,18 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor private final int memSize; private final IPredicateEvaluatorFactory predEvaluatorFactory; private final boolean isLeftOuter; - private final INullWriterFactory[] nullWriterFactories1; + private final IMissingWriterFactory[] nullWriterFactories1; public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize, - boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) { + boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) { this(spec, comparatorFactory, recordDescriptor, memSize, null, isLeftOuter, nullWriterFactories1); } public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, - INullWriterFactory[] nullWriterFactories1) { + IMissingWriterFactory[] nullWriterFactories1) { super(spec, 2, 1); this.comparatorFactory = comparatorFactory; this.recordDescriptors[0] = recordDescriptor; @@ -135,10 +135,10 @@ public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null); - final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null; + final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null; if (isLeftOuter) { for (int i = 0; i < nullWriterFactories1.length; i++) { - nullWriters1[i] = nullWriterFactories1[i].createNullWriter(); + nullWriters1[i] = nullWriterFactories1[i].createMissingWriter(); } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index 810039b..b80059b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@ -28,8 +28,8 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -82,7 +82,7 @@ public class OptimizedHybridHashJoin { private final IPredicateEvaluator predEvaluator; private final boolean isLeftOuter; - private final INullWriter[] nullWriters; + private final IMissingWriter[] nonMatchWriters; private final BitSet spilledStatus; //0=resident, 1=spilled private final int numOfPartitions; @@ -108,7 +108,7 @@ public class OptimizedHybridHashJoin { String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators, RecordDescriptor probeRd, RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, boolean isLeftOuter, - INullWriterFactory[] nullWriterFactories1) { + IMissingWriterFactory[] nullWriterFactories1) { this.ctx = ctx; this.memForJoin = memForJoin; this.buildRd = buildRd; @@ -134,10 +134,10 @@ public class OptimizedHybridHashJoin { this.spilledStatus = new BitSet(numOfPartitions); - this.nullWriters = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null; + this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null; if (isLeftOuter) { for (int i = 0; i < nullWriterFactories1.length; i++) { - nullWriters[i] = nullWriterFactories1[i].createNullWriter(); + nonMatchWriters[i] = nullWriterFactories1[i].createMissingWriter(); } } } @@ -313,7 +313,7 @@ public class OptimizedHybridHashJoin { ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx); this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc, new FrameTupleAccessor(buildRd), buildHpc, - new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nullWriters, table, + new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nonMatchWriters, table, predEvaluator, isReversed); } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 22ad91f..183d7f6 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -36,8 +36,8 @@ import org.apache.hyracks.api.dataflow.TaskId; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -124,7 +124,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD private final IPredicateEvaluatorFactory predEvaluatorFactory; private final boolean isLeftOuter; - private final INullWriterFactory[] nullWriterFactories1; + private final IMissingWriterFactory[] nonMatchWriterFactories; //Flags added for test purpose private static boolean skipInMemoryHJ = false; @@ -138,7 +138,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory01, ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory, - boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException { + boolean isLeftOuter, IMissingWriterFactory[] nonMatchWriterFactories) throws HyracksDataException { super(spec, 2, 1); this.frameLimit = frameLimit; @@ -153,7 +153,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD recordDescriptors[0] = recordDescriptor; this.predEvaluatorFactory = predEvaluatorFactory; this.isLeftOuter = isLeftOuter; - this.nullWriterFactories1 = nullWriterFactories1; + this.nonMatchWriterFactories = nonMatchWriterFactories; } public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int frameLimit, int inputsize0, @@ -162,9 +162,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD ITuplePairComparatorFactory tupPaircomparatorFactory01, ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory) throws HyracksDataException { - this(spec, frameLimit, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories, - comparatorFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10, - predEvaluatorFactory, false, null); + this(spec, frameLimit, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories, comparatorFactories, + recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10, predEvaluatorFactory, false, + null); } @Override @@ -284,7 +284,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD nPartitions); state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions, PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, - buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1); + buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); state.hybridHJ.initBuild(); if (LOGGER.isLoggable(Level.FINE)) { @@ -301,7 +301,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public void close() throws HyracksDataException { state.hybridHJ.closeBuild(); - if (isFailed){ + if (isFailed) { state.hybridHJ.clearBuildTempFiles(); } else { ctx.setStateObject(state); @@ -343,7 +343,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) - throws HyracksDataException { + throws HyracksDataException { final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0); final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); @@ -352,22 +352,22 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD .createTuplePairComparator(ctx); final ITuplePairComparator nljComparatorBuild2Probe = tuplePairComparatorFactoryBuild2Probe .createTuplePairComparator(ctx); - final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory - .createPredicateEvaluator()); + final IPredicateEvaluator predEvaluator = predEvaluatorFactory == null ? null + : predEvaluatorFactory.createPredicateEvaluator(); for (int i = 0; i < comparatorFactories.length; i++) { comparators[i] = comparatorFactories[i].createBinaryComparator(); } - final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null; - final ArrayTupleBuilder nullTupleBuild = isLeftOuter ? - new ArrayTupleBuilder(buildRd.getFieldCount()) : - null; + final IMissingWriter[] nonMatchWriter = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] + : null; + final ArrayTupleBuilder nullTupleBuild = isLeftOuter ? new ArrayTupleBuilder(buildRd.getFieldCount()) + : null; if (isLeftOuter) { DataOutput out = nullTupleBuild.getDataOutput(); - for (int i = 0; i < nullWriterFactories1.length; i++) { - nullWriters1[i] = nullWriterFactories1[i].createNullWriter(); - nullWriters1[i].writeNull(out); + for (int i = 0; i < nonMatchWriterFactories.length; i++) { + nonMatchWriter[i] = nonMatchWriterFactories[i].createMissingWriter(); + nonMatchWriter[i].writeMissing(out); nullTupleBuild.addFieldEndOffset(); } } @@ -381,8 +381,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public void open() throws HyracksDataException { - state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(), - BUILD_AND_PARTITION_ACTIVITY_ID), partition)); + state = (BuildAndPartitionTaskState) ctx.getStateObject( + new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition)); writer.open(); state.hybridHJ.initProbe(); @@ -410,7 +410,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD BitSet partitionStatus = state.hybridHJ.getPartitionStatus(); rPartbuff.reset(); - for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) { + for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus + .nextSetBit(pid + 1)) { RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid); RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid); @@ -435,8 +436,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD //The buildSideReader should be always the original buildSideReader, so should the probeSideReader private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader, - int buildSizeInTuple, int probeSizeInTuple, int level) - throws HyracksDataException { + int buildSizeInTuple, int probeSizeInTuple, int level) throws HyracksDataException { ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys, hashFunctionGeneratorFactories).createPartitioner(level); ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys, @@ -447,21 +447,18 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple); if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine( - "\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid " - + ") - (level " + level + ")" - + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize - + " - MemForJoin " - + (state.memForJoin) - + " - LeftOuter is " + isLeftOuter); + LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + + ") (pid " + ") - (level " + level + ")" + " - BuildSize:\t" + buildPartSize + + "\tProbeSize:\t" + probePartSize + " - MemForJoin " + (state.memForJoin) + + " - LeftOuter is " + isLeftOuter); } //Apply in-Mem HJ if possible if (!skipInMemoryHJ && ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin && !isLeftOuter))) { int tabSize = -1; - if (!forceRR && (isLeftOuter || (buildPartSize - < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal) + if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) { + //Case 1.1 - InMemHJ (wout Role-Reversal) if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level " + level + "]"); @@ -472,8 +469,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD "Trying to join an empty partition. Invalid table size for inMemoryHashJoin."); } //Build Side is smaller - applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc, - probeHpc, buildSideReader, probeSideReader); // checked-confirmed + applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc, probeHpc, + buildSideReader, probeSideReader); // checked-confirmed } else { //Case 1.2 - InMemHJ with Role Reversal if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine( @@ -486,8 +483,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD "Trying to join an empty partition. Invalid table size for inMemoryHashJoin."); } //Probe Side is smaller - applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc, - buildHpc, probeSideReader, buildSideReader); // checked-confirmed + applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc, + probeSideReader, buildSideReader); // checked-confirmed } } //Apply (Recursive) HHJ @@ -495,8 +492,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]"); } - if (!forceRR && (isLeftOuter - || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal) + if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { + //Case 2.1 - Recursive HHJ (wout Role-Reversal) if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level " + level + "]"); @@ -519,13 +516,11 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD } } - private void applyHybridHashJoin(int tableSize, - final String PROBE_REL, final String BUILD_REL, - final int[] probeKeys, final int[] buildKeys, - final RecordDescriptor probeRd, final RecordDescriptor buildRd, - final ITuplePartitionComputer probeHpc, final ITuplePartitionComputer buildHpc, - RunFileReader probeSideReader, RunFileReader buildSideReader, - final int level, final long beforeMax) + private void applyHybridHashJoin(int tableSize, final String PROBE_REL, final String BUILD_REL, + final int[] probeKeys, final int[] buildKeys, final RecordDescriptor probeRd, + final RecordDescriptor buildRd, final ITuplePartitionComputer probeHpc, + final ITuplePartitionComputer buildHpc, RunFileReader probeSideReader, + RunFileReader buildSideReader, final int level, final long beforeMax) throws HyracksDataException { boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys @@ -534,11 +529,10 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; OptimizedHybridHashJoin rHHj; - int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, - nPartitions); - rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, - probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, - predEvaluator, isLeftOuter, nullWriterFactories1); //checked-confirmed + int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions); + rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeKeys, + buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, + nonMatchWriterFactories); //checked-confirmed rHHj.setIsReversed(isReversed); buildSideReader.open(); @@ -564,8 +558,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize); BitSet rPStatus = rHHj.getPartitionStatus(); - if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD - * beforeMax))) { //Case 2.1.1 - Keep applying HHJ + if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine( "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level " @@ -652,7 +645,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD ISerializableTable table = new SerializableHashTable(tabSize, ctx); InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc), hpcRepProbe, new FrameTupleAccessor(buildRDesc), hpcRepBuild, - new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table, + new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table, predEvaluator, isReversed); bReader.open(); @@ -677,18 +670,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD } private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize, - RunFileReader outerReader, RunFileReader innerReader) - throws HyracksDataException { + RunFileReader outerReader, RunFileReader innerReader) throws HyracksDataException { // The nested loop join result is outer + inner. All the other operator is probe + build. Hence the reverse relation is different boolean isReversed = outerRd == buildRd && innerRd == probeRd; assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; - ITuplePairComparator nljComptorOuterInner = isReversed ? - nljComparatorBuild2Probe : - nljComparatorProbe2Build; - NestedLoopJoin nlj = new NestedLoopJoin(ctx, - new FrameTupleAccessor(outerRd), - new FrameTupleAccessor(innerRd), nljComptorOuterInner, memorySize, - predEvaluator, isLeftOuter, nullWriters1); + ITuplePairComparator nljComptorOuterInner = isReversed ? nljComparatorBuild2Probe + : nljComparatorProbe2Build; + NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), + new FrameTupleAccessor(innerRd), nljComptorOuterInner, memorySize, predEvaluator, + isLeftOuter, nonMatchWriter); nlj.setIsReversed(isReversed); IFrame cacheBuff = new VSizeFrame(ctx); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java index 8232a62..c8e6f59 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java @@ -27,7 +27,7 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.dataset.ResultSetId; @@ -53,7 +53,7 @@ import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; -import org.apache.hyracks.tests.util.NoopNullWriterFactory; +import org.apache.hyracks.tests.util.NoopMissingWriterFactory; import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { @@ -365,9 +365,9 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { UTF8StringParserFactory.INSTANCE }, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()]; - for (int j = 0; j < nullWriterFactories.length; j++) { - nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE; + IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; + for (int j = 0; j < nonMatchWriterFactories.length; j++) { + nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor( @@ -376,7 +376,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - null, custOrderJoinDesc, true, nullWriterFactories, 128); + null, custOrderJoinDesc, true, nonMatchWriterFactories, 128); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -449,9 +449,9 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { UTF8StringParserFactory.INSTANCE }, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()]; - for (int j = 0; j < nullWriterFactories.length; j++) { - nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE; + IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; + for (int j = 0; j < nonMatchWriterFactories.length; j++) { + nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor( @@ -464,7 +464,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, true, nullWriterFactories, null); + custOrderJoinDesc, true, nonMatchWriterFactories, null); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -537,9 +537,9 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { UTF8StringParserFactory.INSTANCE }, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()]; - for (int j = 0; j < nullWriterFactories.length; j++) { - nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE; + IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; + for (int j = 0; j < nonMatchWriterFactories.length; j++) { + nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor( @@ -552,7 +552,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null, true, nullWriterFactories); + custOrderJoinDesc, null, true, nonMatchWriterFactories); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java index c28e496..c568306 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java @@ -29,7 +29,7 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory; @@ -52,7 +52,7 @@ import org.apache.hyracks.dataflow.std.file.FileSplit; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; -import org.apache.hyracks.tests.util.NoopNullWriterFactory; +import org.apache.hyracks.tests.util.NoopMissingWriterFactory; import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest { @@ -405,14 +405,14 @@ public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest UTF8StringParserFactory.INSTANCE }, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()]; - for (int j = 0; j < nullWriterFactories.length; j++) { - nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE; + IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; + for (int j = 0; j < nonMatchWriterFactories.length; j++) { + nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory( PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5, true, - nullWriterFactories); + nonMatchWriterFactories); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID); ResultSetId rsId = new ResultSetId(1); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopMissingWriterFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopMissingWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopMissingWriterFactory.java new file mode 100644 index 0000000..a71be20 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopMissingWriterFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.util; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hyracks.api.dataflow.value.IMissingWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class NoopMissingWriterFactory implements IMissingWriterFactory { + + private static final long serialVersionUID = 1L; + public static final NoopMissingWriterFactory INSTANCE = new NoopMissingWriterFactory(); + + private NoopMissingWriterFactory() { + } + + @Override + public IMissingWriter createMissingWriter() { + return new IMissingWriter() { + @Override + public void writeMissing(DataOutput out) throws HyracksDataException { + try { + out.writeShort(0); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopNullWriterFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopNullWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopNullWriterFactory.java deleted file mode 100644 index e86d3ff..0000000 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/NoopNullWriterFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.tests.util; - -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class NoopNullWriterFactory implements INullWriterFactory { - - private static final long serialVersionUID = 1L; - public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory(); - - private NoopNullWriterFactory() { - } - - @Override - public INullWriter createNullWriter() { - return new INullWriter() { - @Override - public void writeNull(DataOutput out) throws HyracksDataException { - try { - out.writeShort(0); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java index 5a079b1..21a2441 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java @@ -22,7 +22,7 @@ package org.apache.hyracks.storage.am.btree.dataflow; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -52,12 +52,12 @@ public class BTreeSearchOperatorDescriptor extends AbstractTreeIndexOperatorDesc IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, - IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput, boolean retainNull, - INullWriterFactory nullWriterFactory, ISearchOperationCallbackFactory searchOpCallbackProvider, + IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput, boolean retainMissing, + IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchOpCallbackProvider, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) { super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits, - comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, retainInput, retainNull, - nullWriterFactory, NoOpLocalResourceFactoryProvider.INSTANCE, searchOpCallbackProvider, + comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, retainInput, retainMissing, + missingWriterFactory, NoOpLocalResourceFactoryProvider.INSTANCE, searchOpCallbackProvider, NoOpOperationCallbackFactory.INSTANCE); this.lowKeyFields = lowKeyFields; this.highKeyFields = highKeyFields; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java index 378ea65..d713a92 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java @@ -436,7 +436,7 @@ public class FramewriterTest { AbstractTreeIndexOperatorDescriptor opDesc = Mockito.mock(AbstractTreeIndexOperatorDescriptor.class); Mockito.when(opDesc.getIndexDataflowHelperFactory()).thenReturn(indexDataflowHelperFactories[i]); Mockito.when(opDesc.getRetainInput()).thenReturn(false); - Mockito.when(opDesc.getRetainNull()).thenReturn(false); + Mockito.when(opDesc.getRetainMissing()).thenReturn(false); Mockito.when(opDesc.getSearchOpCallbackFactory()).thenReturn(searchOpCallbackFactories[j]); opDescs[k] = opDesc; k++; http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractIndexOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractIndexOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractIndexOperatorDescriptor.java index 8590014..ebe6383 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractIndexOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractIndexOperatorDescriptor.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.common.dataflow; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; @@ -43,7 +43,7 @@ public abstract class AbstractIndexOperatorDescriptor extends AbstractSingleActi protected final ITupleFilterFactory tupleFilterFactory; protected final boolean retainInput; protected final boolean retainNull; - protected final INullWriterFactory nullWriterFactory; + protected final IMissingWriterFactory nullWriterFactory; protected final ISearchOperationCallbackFactory searchOpCallbackFactory; protected final IModificationOperationCallbackFactory modificationOpCallbackFactory; protected final ILocalResourceFactoryProvider localResourceFactoryProvider; @@ -52,7 +52,7 @@ public abstract class AbstractIndexOperatorDescriptor extends AbstractSingleActi RecordDescriptor recDesc, IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider, IFileSplitProvider fileSplitProvider, IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory, - boolean retainInput, boolean retainNull, INullWriterFactory nullWriterFactory, + boolean retainInput, boolean retainNull, IMissingWriterFactory nullWriterFactory, ILocalResourceFactoryProvider localResourceFactoryProvider, ISearchOperationCallbackFactory searchOpCallbackFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory) { @@ -104,12 +104,12 @@ public abstract class AbstractIndexOperatorDescriptor extends AbstractSingleActi } @Override - public boolean getRetainNull() { + public boolean getRetainMissing() { return retainNull; } @Override - public INullWriterFactory getNullWriterFactory() { + public IMissingWriterFactory getMissingWriterFactory() { return nullWriterFactory; } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java index 868725e..2020a15 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/AbstractTreeIndexOperatorDescriptor.java @@ -20,7 +20,7 @@ package org.apache.hyracks.storage.am.common.dataflow; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; @@ -45,7 +45,7 @@ public abstract class AbstractTreeIndexOperatorDescriptor extends AbstractIndexO IIndexLifecycleManagerProvider lifecycleManagerProvider, IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory, - boolean retainInput, boolean retainNull, INullWriterFactory nullWriterFactory, + boolean retainInput, boolean retainNull, IMissingWriterFactory nullWriterFactory, ILocalResourceFactoryProvider localResourceFactoryProvider, ISearchOperationCallbackFactory searchOpCallbackFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory) { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexOperatorDescriptor.java index c90c466..f59c5fd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexOperatorDescriptor.java @@ -20,7 +20,7 @@ package org.apache.hyracks.storage.am.common.dataflow; import org.apache.hyracks.api.dataflow.IActivity; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; +import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; @@ -43,9 +43,9 @@ public interface IIndexOperatorDescriptor extends IActivity { public boolean getRetainInput(); - public boolean getRetainNull(); + public boolean getRetainMissing(); - public INullWriterFactory getNullWriterFactory(); + public IMissingWriterFactory getMissingWriterFactory(); public ISearchOperationCallbackFactory getSearchOpCallbackFactory(); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/535d86b5/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index 650dcfa..4f9e6c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.INullWriter; +import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -63,9 +63,9 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput protected final boolean retainInput; protected FrameTupleReference frameTuple; - protected final boolean retainNull; - protected ArrayTupleBuilder nullTupleBuild; - protected INullWriter nullWriter; + protected final boolean retainMissing; + protected ArrayTupleBuilder nonMatchTupleBuild; + protected IMissingWriter nonMatchWriter; protected final int[] minFilterFieldIndexes; protected final int[] maxFilterFieldIndexes; @@ -78,9 +78,9 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput this.ctx = ctx; this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition); this.retainInput = opDesc.getRetainInput(); - this.retainNull = opDesc.getRetainNull(); - if (this.retainNull) { - this.nullWriter = opDesc.getNullWriterFactory().createNullWriter(); + this.retainMissing = opDesc.getRetainMissing(); + if (this.retainMissing) { + this.nonMatchWriter = opDesc.getMissingWriterFactory().createMissingWriter(); } this.inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); this.minFilterFieldIndexes = minFilterFieldIndexes; @@ -111,20 +111,20 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput indexHelper.open(); index = indexHelper.getIndexInstance(); accessor = new FrameTupleAccessor(inputRecDesc); - if (retainNull) { + if (retainMissing) { int fieldCount = getFieldCount(); - nullTupleBuild = new ArrayTupleBuilder(fieldCount); - DataOutput out = nullTupleBuild.getDataOutput(); + nonMatchTupleBuild = new ArrayTupleBuilder(fieldCount); + DataOutput out = nonMatchTupleBuild.getDataOutput(); for (int i = 0; i < fieldCount; i++) { try { - nullWriter.writeNull(out); + nonMatchWriter.writeMissing(out); } catch (IOException e) { e.printStackTrace(); } - nullTupleBuild.addFieldEndOffset(); + nonMatchTupleBuild.addFieldEndOffset(); } } else { - nullTupleBuild = null; + nonMatchTupleBuild = null; } try { @@ -165,9 +165,10 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); } - if (!matched && retainInput && retainNull) { - FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex, nullTupleBuild.getFieldEndOffsets(), - nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize()); + if (!matched && retainInput && retainMissing) { + FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex, + nonMatchTupleBuild.getFieldEndOffsets(), nonMatchTupleBuild.getByteArray(), 0, + nonMatchTupleBuild.getSize()); } }
