Repository: asterixdb Updated Branches: refs/heads/master db1c115ec -> 76a4f9e36
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java index 6501aeb..74739da 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java @@ -19,24 +19,19 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; -public class ReplicatePOperator extends AbstractPhysicalOperator { +public class ReplicatePOperator extends AbstractReplicatePOperator { @Override public PhysicalOperatorTag getOperatorTag() { @@ -44,55 +39,25 @@ public class ReplicatePOperator extends AbstractPhysicalOperator { } @Override - public boolean isMicroOperator() { - return false; - } - - @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - return emptyUnaryRequirements(); - } - - @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); - deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); - } - - @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { IOperatorDescriptorRegistry spec = builder.getJobSpec(); - RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context); + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); ReplicateOperator rop = (ReplicateOperator) op; int outputArity = rop.getOutputArity(); boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags(); - SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags); + ReplicateOperatorDescriptor splitOpDesc = new ReplicateOperatorDescriptor(spec, recDescriptor, outputArity, + outputMaterializationFlags); contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); } @Override - public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) { - int[] inputDependencyLabels = new int[] { 0 }; - ReplicateOperator rop = (ReplicateOperator) op; - int[] outputDependencyLabels = new int[rop.getOutputArity()]; - // change the labels of outputs that requires materialization to 1 - boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags(); - for (int i = 0; i < rop.getOutputArity(); i++) { - if (outputMaterializationFlags[i]) { - outputDependencyLabels[i] = 1; - } - } - return new Pair<>(inputDependencyLabels, outputDependencyLabels); - } - - @Override public boolean expensiveThanMaterialization() { return false; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java new file mode 100644 index 0000000..3b8aaab --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; + +public class SplitPOperator extends AbstractReplicatePOperator { + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.SPLIT; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + SplitOperator sop = (SplitOperator) op; + int outputArity = sop.getOutputArity(); + + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); + + IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); + IScalarEvaluatorFactory brachingExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory( + sop.getBranchingExpression().getValue(), context.getTypeEnvironment(op), inputSchemas, context); + + IBinaryIntegerInspectorFactory intInsepctorFactory = context.getBinaryIntegerInspectorFactory(); + + SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, + brachingExprEvalFactory, intInsepctorFactory); + + contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc); + ILogicalOperator src = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, op, 0); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java index 7e83880..d3dd166 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java @@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; @@ -49,13 +50,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -63,7 +64,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOpe import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; @@ -236,15 +236,6 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito } @Override - public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent) - throws AlgebricksException { - addIndent(indent).append("partitioning-split ("); - pprintExprList(op.getExpressions(), indent); - buffer.append(")"); - return null; - } - - @Override public Void visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("subplan {"); printNestedPlans(op, indent); @@ -363,6 +354,13 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito } @Override + public Void visitSplitOperator(SplitOperator op, Integer indent) throws AlgebricksException { + Mutable<ILogicalExpression> branchingExpression = op.getBranchingExpression(); + addIndent(indent).append("split " + branchingExpression.getValue().accept(exprVisitor, indent)); + return null; + } + + @Override public Void visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("materialize"); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java index c0f9718..f5ff8b4 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java @@ -34,18 +34,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -82,10 +82,10 @@ public interface ILogicalOperatorVisitor<R, T> { public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException; - public R visitPartitioningSplitOperator(PartitioningSplitOperator op, T arg) throws AlgebricksException; - public R visitReplicateOperator(ReplicateOperator op, T arg) throws AlgebricksException; + public R visitSplitOperator(SplitOperator op, T arg) throws AlgebricksException; + public R visitMaterializeOperator(MaterializeOperator op, T arg) throws AlgebricksException; public R visitScriptOperator(ScriptOperator op, T arg) throws AlgebricksException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java index 39cac06..c5d7291 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java @@ -48,12 +48,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; @@ -174,13 +174,12 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor } @Override - public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) - throws AlgebricksException { + public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { return visit(op); } @Override - public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { return visit(op); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java deleted file mode 100644 index 2d5c929..0000000 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.operators.std; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.VoidPointable; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; -import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; - -public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { - private static final long serialVersionUID = 1L; - public static int NO_DEFAULT_BRANCH = -1; - - private final IScalarEvaluatorFactory[] evalFactories; - private final IBinaryBooleanInspector boolInspector; - private final int defaultBranchIndex; - - public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, - IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex, - RecordDescriptor rDesc) { - super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length); - for (int i = 0; i < evalFactories.length; i++) { - recordDescriptors[i] = rDesc; - } - this.evalFactories = evalFactories; - this.boolInspector = boolInspector; - this.defaultBranchIndex = defaultBranchIndex; - } - - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) - throws HyracksDataException { - return new AbstractUnaryInputOperatorNodePushable() { - private final IFrameWriter[] writers = new IFrameWriter[outputArity]; - private final boolean[] isOpen = new boolean[outputArity]; - private final IFrame[] writeBuffers = new IFrame[outputArity]; - private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity]; - private final IPointable evalPointable = new VoidPointable(); - private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), - 0); - private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc); - private final FrameTupleReference frameTuple = new FrameTupleReference(); - - private final FrameTupleAppender tupleAppender = new FrameTupleAppender(); - private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount()); - private final DataOutput tupleDos = tupleBuilder.getDataOutput(); - - @Override - public void close() throws HyracksDataException { - HyracksDataException hde = null; - for (int i = 0; i < outputArity; i++) { - if (isOpen[i]) { - try { - tupleAppender.reset(writeBuffers[i], false); - tupleAppender.write(writers[i], false); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(); - } - hde.addSuppressed(th); - } finally { - try { - writers[i].close(); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(); - } - hde.addSuppressed(th); - } - } - } - } - if (hde != null) { - throw hde; - } - } - - @Override - public void flush() throws HyracksDataException { - for (int i = 0; i < outputArity; i++) { - tupleAppender.reset(writeBuffers[i], false); - tupleAppender.flush(writers[i]); - } - } - - @Override - public void fail() throws HyracksDataException { - HyracksDataException hde = null; - for (int i = 0; i < outputArity; i++) { - if (isOpen[i]) { - try { - writers[i].fail(); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(); - } - hde.addSuppressed(th); - } - } - } - if (hde != null) { - throw hde; - } - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - accessor.reset(buffer); - int tupleCount = accessor.getTupleCount(); - for (int i = 0; i < tupleCount; i++) { - frameTuple.reset(accessor, i); - boolean found = false; - for (int j = 0; j < evals.length; j++) { - try { - evals[j].evaluate(frameTuple, evalPointable); - } catch (AlgebricksException e) { - throw new HyracksDataException(e); - } - found = boolInspector.getBooleanValue(evalPointable.getByteArray(), - evalPointable.getStartOffset(), evalPointable.getLength()); - if (found) { - copyAndAppendTuple(j); - break; - } - } - // Optionally write to default output branch. - if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) { - copyAndAppendTuple(defaultBranchIndex); - } - } - } - - private void copyAndAppendTuple(int outputIndex) throws HyracksDataException { - // Copy tuple into tuple builder. - try { - tupleBuilder.reset(); - for (int i = 0; i < frameTuple.getFieldCount(); i++) { - tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), - frameTuple.getFieldLength(i)); - tupleBuilder.addFieldEndOffset(); - } - } catch (IOException e) { - throw new HyracksDataException(e); - } - tupleAppender.reset(writeBuffers[outputIndex], false); - FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(), - tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()); - } - - @Override - public void open() throws HyracksDataException { - for (int i = 0; i < writers.length; i++) { - isOpen[i] = true; - writers[i].open(); - } - // Create write buffers. - for (int i = 0; i < outputArity; i++) { - writeBuffers[i] = new VSizeFrame(ctx); - // Make sure to clear all buffers, since we are reusing the tupleAppender. - tupleAppender.reset(writeBuffers[i], true); - } - // Create evaluators for partitioning. - try { - for (int i = 0; i < evalFactories.length; i++) { - evals[i] = evalFactories[i].createScalarEvaluator(ctx); - } - } catch (AlgebricksException e) { - throw new HyracksDataException(e); - } - } - - @Override - public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { - writers[index] = writer; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java new file mode 100644 index 0000000..2215a96 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.operators.std; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector; +import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; + +/** + * Split operator propagates each tuple in a frame to one output branch only unlike Replicate operator. + */ +public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor { + private static final long serialVersionUID = 1L; + + private IScalarEvaluatorFactory brachingExprEvalFactory; + private IBinaryIntegerInspectorFactory intInsepctorFactory; + + public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, + IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory) { + super(spec, rDesc, outputArity); + this.brachingExprEvalFactory = brachingExprEvalFactory; + this.intInsepctorFactory = intInsepctorFactory; + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode( + new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); + builder.addActivity(this, sma); + builder.addSourceEdge(0, sma, 0); + for (int i = 0; i < outputArity; i++) { + builder.addTargetEdge(i, sma, i); + } + } + + // The difference between SplitterMaterializerActivityNode and ReplicatorMaterializerActivityNode is that + // SplitterMaterializerActivityNode propagates each tuple to one output branch only. + private final class SplitterMaterializerActivityNode extends ReplicatorMaterializerActivityNode { + private static final long serialVersionUID = 1L; + + public SplitterMaterializerActivityNode(ActivityId id) { + super(id); + } + + @Override + public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) + throws HyracksDataException { + final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs]; + final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs]; + final IPointable p = VoidPointable.FACTORY.createPointable();; + // To deal with each tuple in a frame + final FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptors[0]);; + final FrameTupleAppender[] appenders = new FrameTupleAppender[numberOfNonMaterializedOutputs]; + final FrameTupleReference tRef = new FrameTupleReference();; + final IBinaryIntegerInspector intInsepctor = intInsepctorFactory.createBinaryIntegerInspector(ctx); + final IScalarEvaluator eval; + try { + eval = brachingExprEvalFactory.createScalarEvaluator(ctx); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), true); + } + + return new AbstractUnaryInputOperatorNodePushable() { + @Override + public void open() throws HyracksDataException { + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + isOpen[i] = true; + writers[i].open(); + } + } + + @Override + public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException { + // Tuple based access + accessor.reset(bufferAccessor); + int tupleCount = accessor.getTupleCount(); + // The output branch number that starts from 0. + int outputBranch; + + for (int i = 0; i < tupleCount; i++) { + // Get the output branch number from the field in the given tuple. + tRef.reset(accessor, i); + try { + eval.evaluate(tRef, p); + } catch (AlgebricksException ae) { + throw new HyracksDataException(ae); + } + outputBranch = intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(), + p.getLength()); + + // Add this tuple to the correct output frame. + FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i); + } + } + + @Override + public void close() throws HyracksDataException { + HyracksDataException hde = null; + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + if (isOpen[i]) { + try { + appenders[i].write(writers[i], true); + writers[i].close(); + } catch (Throwable th) { + if (hde == null) { + hde = new HyracksDataException(th); + } else { + hde.addSuppressed(th); + } + } + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public void fail() throws HyracksDataException { + HyracksDataException hde = null; + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + if (isOpen[i]) { + try { + writers[i].fail(); + } catch (Throwable th) { + if (hde == null) { + hde = new HyracksDataException(th); + } else { + hde.addSuppressed(th); + } + } + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + writers[index] = writer; + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl new file mode 100644 index 0000000..0ea8a88 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl @@ -0,0 +1,4 @@ +0,first branch1 +0,first branch2 +0,first branch3 +0,first branch4 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl new file mode 100644 index 0000000..53588ef --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl @@ -0,0 +1,3 @@ +1,second branch1 +1,second branch2 +1,second branch3 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl new file mode 100644 index 0000000..ceb859a --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl @@ -0,0 +1,7 @@ +0|first branch1 +1|second branch1 +0|first branch2 +1|second branch2 +0|first branch3 +1|second branch3 +0|first branch4 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index 020cffe..1276518 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -50,6 +50,7 @@ import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRunt import org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor; import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory; @@ -83,7 +84,7 @@ import org.apache.hyracks.dataflow.std.file.FileSplit; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor; import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor; -import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -570,7 +571,7 @@ public class PushRuntimeTest { } @Test - public void scanSplitWrite() throws Exception { + public void scanReplicateWrite() throws Exception { final int outputArity = 2; JobSpecification spec = new JobSpecification(FRAME_SIZE); @@ -596,7 +597,69 @@ public class PushRuntimeTest { PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); - SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity); + ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, + new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); + + IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length]; + for (int i = 0; i < outputArity; i++) { + outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { + new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) }); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], + new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); + } + + spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0); + for (int i = 0; i < outputArity; i++) { + spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0); + } + + for (int i = 0; i < outputArity; i++) { + spec.addRoot(outputOp[i]); + } + AlgebricksHyracksIntegrationUtil.runJob(spec); + + for (int i = 0; i < outputArity; i++) { + compareFiles(inputFileName, outputFile[i].getAbsolutePath()); + } + } + + @Test + public void scanSplitWrite() throws Exception { + final int outputArity = 2; + + JobSpecification spec = new JobSpecification(FRAME_SIZE); + + String inputFileName[] = { "data/simple/int-string-part1.tbl", "data/simple/int-string-part1-split-0.tbl", + "data/simple/int-string-part1-split-1.tbl" }; + File[] inputFiles = new File[inputFileName.length]; + for (int i=0; i<inputFileName.length; i++) { + inputFiles[i] = new File(inputFileName[i]); + } + File[] outputFile = new File[outputArity]; + for (int i = 0; i < outputArity; i++) { + outputFile[i] = File.createTempFile("splitop", null); + } + + FileSplit[] inputSplits = new FileSplit[] { + new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(inputFiles[0])) }; + IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(inputSplits); + + RecordDescriptor scannerDesc = new RecordDescriptor( + new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, + new UTF8StringSerializerDeserializer() }); + + IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE }; + + FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider, + new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES); + + SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, scannerDesc, outputArity, + new TupleFieldEvaluatorFactory(0), BinaryIntegerInspectorImpl.FACTORY); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); @@ -609,7 +672,7 @@ public class PushRuntimeTest { new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); } - spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, splitOp, 0); for (int i = 0; i < outputArity; i++) { spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0); } @@ -620,7 +683,7 @@ public class PushRuntimeTest { AlgebricksHyracksIntegrationUtil.runJob(spec); for (int i = 0; i < outputArity; i++) { - compareFiles(inputFileName, outputFile[i].getAbsolutePath()); + compareFiles(inputFileName[i + 1], outputFile[i].getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java new file mode 100644 index 0000000..5c642ba --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.base; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.TaskId; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState; + +/** + * Abstract class for two replication related operator descriptor - replicate and split + * Replicate operator propagates all frames to all output branches. + * That is, each tuple will be propagated to all output branches. + * Split operator propagates each tuple in a frame to one output branch only. + */ +public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperatorDescriptor { + protected static final long serialVersionUID = 1L; + + protected final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0; + protected final static int MATERIALIZE_READER_ACTIVITY_ID = 1; + + protected final boolean[] outputMaterializationFlags; + protected final boolean requiresMaterialization; + protected final int numberOfNonMaterializedOutputs; + protected final int numberOfMaterializedOutputs; + + public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, + int outputArity) { + this(spec, rDesc, outputArity, new boolean[outputArity]); + } + + public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, + int outputArity, boolean[] outputMaterializationFlags) { + super(spec, 1, outputArity); + for (int i = 0; i < outputArity; i++) { + recordDescriptors[i] = rDesc; + } + this.outputMaterializationFlags = outputMaterializationFlags; + + boolean reqMaterialization = false; + int matOutputs = 0; + int nonMatOutputs = 0; + for (boolean flag : outputMaterializationFlags) { + if (flag) { + reqMaterialization = true; + matOutputs++; + } else { + nonMatOutputs++; + } + } + + this.requiresMaterialization = reqMaterialization; + this.numberOfMaterializedOutputs = matOutputs; + this.numberOfNonMaterializedOutputs = nonMatOutputs; + + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + ReplicatorMaterializerActivityNode sma = new ReplicatorMaterializerActivityNode( + new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); + builder.addActivity(this, sma); + builder.addSourceEdge(0, sma, 0); + int pipelineOutputIndex = 0; + int activityId = MATERIALIZE_READER_ACTIVITY_ID; + for (int i = 0; i < outputArity; i++) { + if (outputMaterializationFlags[i]) { + MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode( + new ActivityId(odId, activityId++)); + builder.addActivity(this, mra); + builder.addBlockingEdge(sma, mra); + builder.addTargetEdge(i, mra, 0); + } else { + builder.addTargetEdge(i, sma, pipelineOutputIndex++); + } + } + } + + protected class ReplicatorMaterializerActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + public ReplicatorMaterializerActivityNode(ActivityId id) { + super(id); + } + + @Override + public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) + throws HyracksDataException { + return new AbstractUnaryInputOperatorNodePushable() { + private MaterializerTaskState state; + private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs]; + private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs]; + + @Override + public void open() throws HyracksDataException { + if (requiresMaterialization) { + state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), + new TaskId(getActivityId(), partition), numberOfMaterializedOutputs); + state.open(ctx); + } + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + isOpen[i] = true; + writers[i].open(); + } + } + + @Override + public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException { + if (requiresMaterialization) { + state.appendFrame(bufferAccessor); + bufferAccessor.clear(); + } + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + FrameUtils.flushFrame(bufferAccessor, writers[i]); + } + } + + @Override + public void flush() throws HyracksDataException { + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + writers[i].flush(); + } + } + + @Override + public void close() throws HyracksDataException { + HyracksDataException hde = null; + try { + if (requiresMaterialization) { + state.close(); + ctx.setStateObject(state); + } + } finally { + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + if (isOpen[i]) { + try { + writers[i].close(); + } catch (Throwable th) { + if (hde == null) { + hde = new HyracksDataException(th); + } else { + hde.addSuppressed(th); + } + } + } + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public void fail() throws HyracksDataException { + HyracksDataException hde = null; + for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { + if (isOpen[i]) { + try { + writers[i].fail(); + } catch (Throwable th) { + if (hde == null) { + hde = new HyracksDataException(th); + } else { + hde.addSuppressed(th); + } + } + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + writers[index] = writer; + } + }; + } + } + + protected class MaterializeReaderActivityNode extends AbstractActivityNode { + private static final long serialVersionUID = 1L; + + public MaterializeReaderActivityNode(ActivityId id) { + super(id); + } + + @Override + public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) + throws HyracksDataException { + return new AbstractUnaryOutputSourceOperatorNodePushable() { + + @Override + public void initialize() throws HyracksDataException { + MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject( + new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition)); + state.writeOut(writer, new VSizeFrame(ctx)); + } + + }; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java new file mode 100644 index 0000000..0782647 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.misc; + +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor; + +public class ReplicateOperatorDescriptor extends AbstractReplicateOperatorDescriptor { + private static final long serialVersionUID = 1L; + + public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) { + this(spec, rDesc, outputArity, new boolean[outputArity]); + } + + public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, + boolean[] outputMaterializationFlags) { + super(spec, rDesc, outputArity, outputMaterializationFlags); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java deleted file mode 100644 index 67af861..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.std.misc; - -import java.nio.ByteBuffer; - -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; -import org.apache.hyracks.api.dataflow.IOperatorNodePushable; -import org.apache.hyracks.api.dataflow.TaskId; -import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable; -import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; - -public class SplitOperatorDescriptor extends AbstractOperatorDescriptor { - private static final long serialVersionUID = 1L; - - private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0; - private final static int MATERIALIZE_READER_ACTIVITY_ID = 1; - - private final boolean[] outputMaterializationFlags; - private final boolean requiresMaterialization; - private final int numberOfNonMaterializedOutputs; - private final int numberOfMaterializedOutputs; - - public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) { - this(spec, rDesc, outputArity, new boolean[outputArity]); - } - - public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity, - boolean[] outputMaterializationFlags) { - super(spec, 1, outputArity); - for (int i = 0; i < outputArity; i++) { - recordDescriptors[i] = rDesc; - } - this.outputMaterializationFlags = outputMaterializationFlags; - - boolean reqMaterialization = false; - int matOutputs = 0; - int nonMatOutputs = 0; - for (boolean flag : outputMaterializationFlags) { - if (flag) { - reqMaterialization = true; - matOutputs++; - } else { - nonMatOutputs++; - } - } - - this.requiresMaterialization = reqMaterialization; - this.numberOfMaterializedOutputs = matOutputs; - this.numberOfNonMaterializedOutputs = nonMatOutputs; - - } - - @Override - public void contributeActivities(IActivityGraphBuilder builder) { - SplitterMaterializerActivityNode sma = - new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); - builder.addActivity(this, sma); - builder.addSourceEdge(0, sma, 0); - int pipelineOutputIndex = 0; - int activityId = MATERIALIZE_READER_ACTIVITY_ID; - for (int i = 0; i < outputArity; i++) { - if (outputMaterializationFlags[i]) { - MaterializeReaderActivityNode mra = - new MaterializeReaderActivityNode(new ActivityId(odId, activityId++)); - builder.addActivity(this, mra); - builder.addBlockingEdge(sma, mra); - builder.addTargetEdge(i, mra, 0); - } else { - builder.addTargetEdge(i, sma, pipelineOutputIndex++); - } - } - } - - private final class SplitterMaterializerActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - - public SplitterMaterializerActivityNode(ActivityId id) { - super(id); - } - - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { - return new AbstractUnaryInputOperatorNodePushable() { - private MaterializerTaskState state; - private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs]; - private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs]; - - @Override - public void open() throws HyracksDataException { - if (requiresMaterialization) { - state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), - new TaskId(getActivityId(), partition), numberOfMaterializedOutputs); - state.open(ctx); - } - for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { - isOpen[i] = true; - writers[i].open(); - } - } - - @Override - public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException { - if (requiresMaterialization) { - state.appendFrame(bufferAccessor); - bufferAccessor.clear(); - } - for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { - FrameUtils.flushFrame(bufferAccessor, writers[i]); - } - } - - @Override - public void flush() throws HyracksDataException { - if (!requiresMaterialization) { - for (IFrameWriter writer : writers) { - writer.flush(); - } - } - } - - @Override - public void close() throws HyracksDataException { - HyracksDataException hde = null; - try { - if (requiresMaterialization) { - state.close(); - ctx.setStateObject(state); - } - } finally { - for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { - if (isOpen[i]) { - try { - writers[i].close(); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(th); - } else { - hde.addSuppressed(th); - } - } - } - } - } - if (hde != null) { - throw hde; - } - } - - @Override - public void fail() throws HyracksDataException { - HyracksDataException hde = null; - for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { - if (isOpen[i]) { - try { - writers[i].fail(); - } catch (Throwable th) { - if (hde == null) { - hde = new HyracksDataException(th); - } else { - hde.addSuppressed(th); - } - } - } - } - if (hde != null) { - throw hde; - } - } - - @Override - public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { - writers[index] = writer; - } - }; - } - } - - private final class MaterializeReaderActivityNode extends AbstractActivityNode { - private static final long serialVersionUID = 1L; - - public MaterializeReaderActivityNode(ActivityId id) { - super(id); - } - - @Override - public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, - final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) - throws HyracksDataException { - return new AbstractUnaryOutputSourceOperatorNodePushable() { - - @Override - public void initialize() throws HyracksDataException { - MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject( - new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition)); - state.writeOut(writer, new VSizeFrame(ctx)); - } - - }; - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java new file mode 100644 index 0000000..9a56cd3 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.integration; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; + +import org.apache.hyracks.api.constraints.PartitionConstraintHelper; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; +import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; +import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; +import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.FileSplit; +import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; +import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; +import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; +import org.junit.Assert; +import org.junit.Test; + +public class ReplicateOperatorTest extends AbstractIntegrationTest { + + public void compareFiles(String fileNameA, String fileNameB) throws IOException { + BufferedReader fileA = new BufferedReader(new FileReader(fileNameA)); + BufferedReader fileB = new BufferedReader(new FileReader(fileNameB)); + + String lineA, lineB; + while ((lineA = fileA.readLine()) != null) { + lineB = fileB.readLine(); + Assert.assertEquals(lineA, lineB); + } + Assert.assertNull(fileB.readLine()); + fileA.close(); + fileB.close(); + } + + @Test + public void test() throws Exception { + final int outputArity = 2; + + JobSpecification spec = new JobSpecification(); + + String inputFileName = "data/words.txt"; + File[] outputFile = new File[outputArity]; + for (int i = 0; i < outputArity; i++) { + outputFile[i] = File.createTempFile("replicateop", null); + outputFile[i].deleteOnExit(); + } + + FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) }; + + String[] locations = new String[] { NC1_ID }; + + DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory( + new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000'); + RecordDescriptor stringRec = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), }); + + FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider( + inputSplits), stringParser, stringRec); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations); + + ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, locations); + + IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length]; + for (int i = 0; i < outputArity; i++) { + ResultSetId rsId = new ResultSetId(i); + spec.addResultSetId(rsId); + + outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false, + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations); + } + + spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0); + for (int i = 0; i < outputArity; i++) { + spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0); + } + + for (int i = 0; i < outputArity; i++) { + spec.addRoot(outputOp[i]); + } + String[] expectedResultsFileNames = new String[outputArity]; + for (int i = 0; i < outputArity; i++) { + expectedResultsFileNames[i] = inputFileName; + } + runTestAndCompareResults(spec, expectedResultsFileNames); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java deleted file mode 100644 index 40b4251..0000000 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.tests.integration; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hyracks.api.constraints.PartitionConstraintHelper; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; -import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; -import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; -import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.FileSplit; -import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor; -import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; -import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; - -public class SplitOperatorTest extends AbstractIntegrationTest { - - public void compareFiles(String fileNameA, String fileNameB) throws IOException { - BufferedReader fileA = new BufferedReader(new FileReader(fileNameA)); - BufferedReader fileB = new BufferedReader(new FileReader(fileNameB)); - - String lineA, lineB; - while ((lineA = fileA.readLine()) != null) { - lineB = fileB.readLine(); - Assert.assertEquals(lineA, lineB); - } - Assert.assertNull(fileB.readLine()); - fileA.close(); - fileB.close(); - } - - @Test - public void test() throws Exception { - final int outputArity = 2; - - JobSpecification spec = new JobSpecification(); - - String inputFileName = "data/words.txt"; - File[] outputFile = new File[outputArity]; - for (int i = 0; i < outputArity; i++) { - outputFile[i] = File.createTempFile("splitop", null); - outputFile[i].deleteOnExit(); - } - - FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) }; - - String[] locations = new String[] { NC1_ID }; - - DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory( - new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000'); - RecordDescriptor stringRec = new RecordDescriptor( - new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), }); - - FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider( - inputSplits), stringParser, stringRec); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations); - - SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations); - - IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length]; - for (int i = 0; i < outputArity; i++) { - ResultSetId rsId = new ResultSetId(i); - spec.addResultSetId(rsId); - - outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations); - } - - spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0); - for (int i = 0; i < outputArity; i++) { - spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0); - } - - for (int i = 0; i < outputArity; i++) { - spec.addRoot(outputOp[i]); - } - String[] expectedResultsFileNames = new String[outputArity]; - for (int i = 0; i < outputArity; i++) { - expectedResultsFileNames[i] = inputFileName; - } - runTestAndCompareResults(spec, expectedResultsFileNames); - } -}
