http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java index 3335d71..fa11f73 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java @@ -52,6 +52,10 @@ import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.api.exceptions.SourceLocation; +/** + * Pre-conditions: + * FixReplicateOperatorOutputsRule should be fired + */ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule { private final HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents = @@ -62,6 +66,8 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule { private final HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<>(); private final HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<>(); private int lastUsedClusterId = 0; + private final Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs = new HashMap<>(); + private final List<Pair<Mutable<ILogicalOperator>, Boolean>> newOutputs = new ArrayList<>(); @Override public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) @@ -268,11 +274,71 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule { context.computeAndSetTypeEnvironmentForOperator(parentOp); } } + cleanupPlan(); rewritten = true; } return rewritten; } + /** + * Cleans up the plan after combining similar branches into one branch making sure parents & children point to + * each other correctly. + */ + private void cleanupPlan() { + for (Mutable<ILogicalOperator> root : roots) { + replicateToOutputs.clear(); + newOutputs.clear(); + findReplicateOp(root, replicateToOutputs); + cleanup(replicateToOutputs, newOutputs); + } + } + + /** + * Updates the outputs references of a replicate operator to points to the valid parents. + * @param replicateToOutputs where the replicate operators are stored with its valid parents. + * @param newOutputs the valid parents of replicate operator. + */ + private void cleanup(Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs, + List<Pair<Mutable<ILogicalOperator>, Boolean>> newOutputs) { + replicateToOutputs.forEach((repRef, allOutputs) -> { + newOutputs.clear(); + // get the indexes that are set in the BitSet + allOutputs.stream().forEach(outIndex -> { + newOutputs.add(new Pair<>(((AbstractReplicateOperator) repRef.getValue()).getOutputs().get(outIndex), + ((AbstractReplicateOperator) repRef.getValue()).getOutputMaterializationFlags()[outIndex])); + }); + ((AbstractReplicateOperator) repRef.getValue()).setOutputs(newOutputs); + }); + } + + /** + * Collects all replicate operator starting from {@param parent} and all its descendants and keeps track of the + * valid parents of a replicate operator. The indexes of valid parents will be set in the BitSet. + * @param parent the current operator in consideration for which we want to find replicate op children. + * @param replicateToOutputs where the replicate operators will be stored with all its parents (valid & invalid). + */ + private void findReplicateOp(Mutable<ILogicalOperator> parent, + Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs) { + List<Mutable<ILogicalOperator>> children = parent.getValue().getInputs(); + for (Mutable<ILogicalOperator> childRef : children) { + AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + if (child.getOperatorTag() == LogicalOperatorTag.REPLICATE + || child.getOperatorTag() == LogicalOperatorTag.SPLIT) { + AbstractReplicateOperator replicateChild = (AbstractReplicateOperator) child; + int parentIndex = replicateChild.getOutputs().indexOf(parent); + if (parentIndex >= 0) { + BitSet replicateValidOutputs = replicateToOutputs.get(childRef); + if (replicateValidOutputs == null) { + replicateValidOutputs = new BitSet(); + replicateToOutputs.put(childRef, replicateValidOutputs); + } + replicateValidOutputs.set(parentIndex); + } + } + findReplicateOp(childRef, replicateToOutputs); + } + } + private void genCandidates(IOptimizationContext context) throws AlgebricksException { List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java index 6967271..5d6237a 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java @@ -67,15 +67,18 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; */ public class InlineVariablesRule implements IAlgebraicRewriteRule { - // Map of variables that could be replaced by their producing expression. - // Populated during the top-down sweep of the plan. - protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>(); - // Visitor for replacing variable reference expressions with their originating expression. + // map of variables that could be replaced by their producing expression. + // populated during the top-down sweep of the plan. + private Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>(); + // visitor for replacing variable reference expressions with their originating expression. protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs); - // Set of FunctionIdentifiers that we should not inline. + // set of FunctionIdentifiers that we should not inline. protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<>(); - // Indicates whether the rule has been run - protected boolean hasRun = false; + // indicates whether the rule has been run + private boolean hasRun = false; + // set to prevent re-visiting a subtree from the other sides. Operators with multiple outputs are the ones that + // could be re-visited twice or more (e.g. replicate and split operators) + private final Map<ILogicalOperator, Boolean> subTreesDone = new HashMap<>(); @Override public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { @@ -103,6 +106,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { protected void prepare(IOptimizationContext context) { varAssignRhs.clear(); inlineVisitor.setContext(context); + subTreesDone.clear(); } protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException { @@ -118,10 +122,14 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { return false; } - protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + private boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + // check if you have already visited the subtree rooted at this operator + if (subTreesDone.containsKey(op)) { + return subTreesDone.get(op); + } // Update mapping from variables to expressions during top-down traversal. if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) { AssignOperator assignOp = (AssignOperator) op; @@ -183,6 +191,10 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { // Re-enable rules that we may have already tried. They could be applicable now after inlining. context.removeFromAlreadyCompared(opRef.getValue()); } + // mark the subtree rooted at op as visited so that you don't visit it again + if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE || op.getOperatorTag() == LogicalOperatorTag.SPLIT) { + subTreesDone.put(op, modified); + } return modified; } @@ -209,7 +221,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { this.context = context; } - public void setOperator(ILogicalOperator op) throws AlgebricksException { + public void setOperator(ILogicalOperator op) { this.op = op; liveVars.clear(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 4869761..4273553 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -59,6 +59,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceS import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator; @@ -394,6 +395,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule op.setPhysicalOperator(new SinkPOperator()); break; } + case FORWARD: + op.setPhysicalOperator(new ForwardPOperator()); + break; } } if (op.hasNestedPlans()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java index 35aa984..6b09894 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java @@ -34,10 +34,11 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; @@ -246,6 +247,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor } @Override + public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { + return visit(op); + } + + @Override public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { return visit(op); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java index 0dcc83a..2068ef3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java @@ -22,5 +22,23 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface ITuplePartitionComputer { + /** + * For the tuple (located at tIndex in the frame), it determines which target partition (0,1,... nParts-1) the tuple + * should be sent/written to. + * @param accessor The accessor of the frame to access tuples + * @param tIndex The index of the tuple in consideration + * @param nParts The number of target partitions + * @return The chosen target partition number as dictated by the logic of the partition computer + * @throws HyracksDataException + */ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException; + + /** + * Gives the data partitioner a chance to set up its environment before it starts partitioning tuples. This method + * should be called in the open() of {@link org.apache.hyracks.api.comm.IFrameWriter}. The default implementation + * is "do nothing". + * @throws HyracksDataException + */ + public default void initialize() throws HyracksDataException { + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java index cde0057..81f9053 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java @@ -20,6 +20,8 @@ package org.apache.hyracks.api.dataflow.value; import java.io.Serializable; +import org.apache.hyracks.api.context.IHyracksTaskContext; + public interface ITuplePartitionComputerFactory extends Serializable { - public ITuplePartitionComputer createPartitioner(); + public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 09193d9..7d126ac 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -149,6 +149,9 @@ public class ErrorCode { public static final int UNDEFINED_INVERTED_LIST_MERGE_TYPE = 113; public static final int NODE_IS_NOT_ACTIVE = 114; public static final int LOCAL_NETWORK_ERROR = 115; + public static final int ONE_TUPLE_RANGEMAP_EXPECTED = 116; + public static final int NO_RANGEMAP_PRODUCED = 117; + public static final int RANGEMAP_NOT_FOUND = 118; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index c704d7e..50e92b3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -132,6 +132,9 @@ 113 = Undefined inverted-list merge type: %1$s 114 = Node (%1$s) is not active 115 = Local network error +116 = One tuple rangemap is expected +117 = No range map produced for parallel sort +118 = Range map was not found for parallel sort 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java new file mode 100644 index 0000000..d858a7f --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.data.std.util; + +import java.io.DataInputStream; + +public class ByteArrayAccessibleDataInputStream extends DataInputStream { + + public ByteArrayAccessibleDataInputStream(ByteArrayAccessibleInputStream in) { + super(in); + } + + public ByteArrayAccessibleInputStream getInputStream() { + return (ByteArrayAccessibleInputStream) in; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java new file mode 100644 index 0000000..2785751 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.data.std.util; + +import java.io.ByteArrayInputStream; + +public class ByteArrayAccessibleInputStream extends ByteArrayInputStream { + + public ByteArrayAccessibleInputStream(byte[] buf, int offset, int length) { + super(buf, offset, length); + } + + public void setContent(byte[] buf, int offset, int length) { + this.buf = buf; + this.pos = offset; + this.count = Math.min(offset + length, buf.length); + this.mark = offset; + } + + public byte[] getArray() { + return buf; + } + + public int getPosition() { + return pos; + } + + public int getCount() { + return count; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java index dc66d19..ab5ab01 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java @@ -19,6 +19,7 @@ package org.apache.hyracks.dataflow.common.data.partition; import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; @@ -36,7 +37,7 @@ public class FieldHashPartitionComputerFactory implements ITuplePartitionCompute } @Override - public ITuplePartitionComputer createPartitioner() { + public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) { final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length]; for (int i = 0; i < hashFunctionFactories.length; ++i) { hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java new file mode 100644 index 0000000..e55841a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.common.data.partition; + +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; + +public class OnePartitionComputerFactory implements ITuplePartitionComputerFactory { + private static final long serialVersionUID = 1L; + + @Override + public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) { + return new ITuplePartitionComputer() { + @Override + public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) { + return 0; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java index e034af0..63d01fc 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java @@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.common.data.partition; import java.util.Random; import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,7 +31,7 @@ public class RandomPartitionComputerFactory implements ITuplePartitionComputerFa private static final long serialVersionUID = 1L; @Override - public ITuplePartitionComputer createPartitioner() { + public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) { return new ITuplePartitionComputer() { private final Random random = new Random(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java index 9cb11fa..1821d78 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java @@ -19,6 +19,7 @@ package org.apache.hyracks.dataflow.common.data.partition; import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -35,9 +36,9 @@ public class RepartitionComputerFactory implements ITuplePartitionComputerFactor } @Override - public ITuplePartitionComputer createPartitioner() { + public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) { return new ITuplePartitionComputer() { - private ITuplePartitionComputer delegate = delegateFactory.createPartitioner(); + private ITuplePartitionComputer delegate = delegateFactory.createPartitioner(hyracksTaskContext); @Override public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java new file mode 100644 index 0000000..bc642a9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.common.data.partition.range; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.dataflow.common.utils.TaskUtil; + +public class DynamicFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory { + private static final long serialVersionUID = 1L; + private final String rangeMapKeyInContext; + private final SourceLocation sourceLocation; + + public DynamicFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories, + String rangeMapKeyInContext, SourceLocation sourceLocation) { + super(rangeFields, comparatorFactories); + this.rangeMapKeyInContext = rangeMapKeyInContext; + this.sourceLocation = sourceLocation; + } + + @Override + protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException { + RangeMap rangeMap = TaskUtil.get(rangeMapKeyInContext, hyracksTaskContext); + if (rangeMap == null) { + throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation); + } + return rangeMap; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java index d58a248..55d4420 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java @@ -19,36 +19,41 @@ package org.apache.hyracks.dataflow.common.data.partition.range; import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; -public class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory { +public abstract class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory { private static final long serialVersionUID = 1L; private final int[] rangeFields; - private IRangeMap rangeMap; private IBinaryComparatorFactory[] comparatorFactories; - public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories, - IRangeMap rangeMap) { + protected FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories) { this.rangeFields = rangeFields; this.comparatorFactories = comparatorFactories; - this.rangeMap = rangeMap; } + protected abstract RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException; + @Override - public ITuplePartitionComputer createPartitioner() { + public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) { final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); } + return new ITuplePartitionComputer() { + private RangeMap rangeMap; + + @Override + public void initialize() throws HyracksDataException { + rangeMap = getRangeMap(hyracksTaskContext); + } + @Override - /** - * Determine the range partition. - */ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException { if (nParts == 1) { return 0; @@ -62,13 +67,10 @@ public class FieldRangePartitionComputerFactory implements ITuplePartitionComput return (int) Math.floor(slotIndex / rangesPerPart); } - /* - * Determine the range partition. - */ - public int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException { + private int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException { int slotIndex = 0; - for (int i = 0; i < rangeMap.getSplitCount(); ++i) { - int c = compareSlotAndFields(accessor, tIndex, i); + for (int slotNumber = 0; slotNumber < rangeMap.getSplitCount(); ++slotNumber) { + int c = compareSlotAndFields(accessor, tIndex, slotNumber); if (c < 0) { return slotIndex; } @@ -77,18 +79,18 @@ public class FieldRangePartitionComputerFactory implements ITuplePartitionComput return slotIndex; } - public int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int fieldIndex) + private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int slotNumber) throws HyracksDataException { int c = 0; int startOffset = accessor.getTupleStartOffset(tIndex); int slotLength = accessor.getFieldSlotsLength(); - for (int f = 0; f < comparators.length; ++f) { - int fIdx = rangeFields[f]; + for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) { + int fIdx = rangeFields[fieldNum]; int fStart = accessor.getFieldStartOffset(tIndex, fIdx); int fEnd = accessor.getFieldEndOffset(tIndex, fIdx); - c = comparators[f].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart, - fEnd - fStart, rangeMap.getByteArray(fieldIndex, f), rangeMap.getStartOffset(fieldIndex, f), - rangeMap.getLength(fieldIndex, f)); + c = comparators[fieldNum].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart, + fEnd - fStart, rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber), + rangeMap.getLength(fieldNum, slotNumber)); if (c != 0) { return c; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java deleted file mode 100644 index 5c5f34b..0000000 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.dataflow.common.data.partition.range; - -import org.apache.hyracks.data.std.api.IPointable; - -public interface IRangeMap { - public IPointable getFieldSplit(int columnIndex, int splitIndex); - - public int getSplitCount(); - - public byte[] getByteArray(int columnIndex, int splitIndex); - - public int getStartOffset(int columnIndex, int splitIndex); - - public int getLength(int columnIndex, int splitIndex); - - public int getTag(int columnIndex, int splitIndex); -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java index 98acbc0..714e3c0 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java @@ -19,80 +19,110 @@ package org.apache.hyracks.dataflow.common.data.partition.range; import java.io.Serializable; - -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.VoidPointable; +import java.util.Arrays; +import java.util.Objects; /** - * The range map stores the field split values in an byte array. - * The first split value for each field followed by the second split value for each field, etc. + * <pre> + * The range map stores the fields split values in a byte array. + * The first split value for each field followed by the second split value for each field, etc. For example: + * split_point_idx0 split_point_idx1 split_point_idx2 split_point_idx3 split_point_idx4 + * in the byte[]: f0,f1,f2 f0,f1,f2 f0,f1,f2 f0,f1,f2 f0,f1,f2 + * numFields would be = 3 + * we have 5 split points, which gives us 6 partitions: + * p1 | p2 | p3 | p4 | p5 | p6 + * sp0 sp1 sp2 sp3 sp4 + * endOffsets.length would be = 15 + * </pre> */ -public class RangeMap implements IRangeMap, Serializable { - private final int fields; +public class RangeMap implements Serializable { + private final int numFields; private final byte[] bytes; - private final int[] offsets; + private final int[] endOffsets; - public RangeMap(int fields, byte[] bytes, int[] offsets) { - this.fields = fields; + public RangeMap(int numFields, byte[] bytes, int[] endOffsets) { + this.numFields = numFields; this.bytes = bytes; - this.offsets = offsets; - } - - @Override - public IPointable getFieldSplit(int columnIndex, int splitIndex) { - IPointable p = VoidPointable.FACTORY.createPointable(); - int index = getFieldIndex(columnIndex, splitIndex); - p.set(bytes, getFieldStart(index), getFieldLength(index)); - return p; + this.endOffsets = endOffsets; } - @Override public int getSplitCount() { - return offsets.length / fields; + return endOffsets.length / numFields; } - @Override - public byte[] getByteArray(int columnIndex, int splitIndex) { + public byte[] getByteArray() { return bytes; } - @Override - public int getTag(int columnIndex, int splitIndex) { - return getFieldTag(getFieldIndex(columnIndex, splitIndex)); + public int getTag(int fieldIndex, int splitIndex) { + return getSplitValueTag(getSplitValueIndex(fieldIndex, splitIndex)); } - @Override - public int getStartOffset(int columnIndex, int splitIndex) { - return getFieldStart(getFieldIndex(columnIndex, splitIndex)); + public int getStartOffset(int fieldIndex, int splitIndex) { + return getSplitValueStart(getSplitValueIndex(fieldIndex, splitIndex)); } - @Override - public int getLength(int columnIndex, int splitIndex) { - return getFieldLength(getFieldIndex(columnIndex, splitIndex)); + public int getLength(int fieldIndex, int splitIndex) { + return getSplitValueLength(getSplitValueIndex(fieldIndex, splitIndex)); } - private int getFieldIndex(int columnIndex, int splitIndex) { - return splitIndex * fields + columnIndex; + /** Translates fieldIndex & splitIndex into an index which is used to find information about that split value. + * The combination of a fieldIndex & splitIndex uniquely identifies a split value of interest. + * @param fieldIndex the field index within the splitIndex of interest (0 <= fieldIndex < numFields) + * @param splitIndex starts with 0,1,2,.. etc + * @return the index of the desired split value that could be used with {@code bytes} & {@code endOffsets}. + */ + private int getSplitValueIndex(int fieldIndex, int splitIndex) { + return splitIndex * numFields + fieldIndex; } - private int getFieldTag(int index) { - return bytes[getFieldStart(index)]; + /** + * @param splitValueIndex is the combination of the split index + the field index within that split index + * @return the type tag of a specific field in a specific split point + */ + private int getSplitValueTag(int splitValueIndex) { + return bytes[getSplitValueStart(splitValueIndex)]; } - private int getFieldStart(int index) { + /** + * @param splitValueIndex is the combination of the split index + the field index within that split index + * @return the location of a split value in the byte array {@code bytes} + */ + private int getSplitValueStart(int splitValueIndex) { int start = 0; - if (index != 0) { - start = offsets[index - 1]; + if (splitValueIndex != 0) { + start = endOffsets[splitValueIndex - 1]; } return start; } - private int getFieldLength(int index) { - int length = offsets[index]; - if (index != 0) { - length -= offsets[index - 1]; + /** + * @param splitValueIndex is the combination of the split index + the field index within that split index + * @return the length of a split value + */ + private int getSplitValueLength(int splitValueIndex) { + int length = endOffsets[splitValueIndex]; + if (splitValueIndex != 0) { + length -= endOffsets[splitValueIndex - 1]; } return length; } + @Override + public int hashCode() { + return numFields + Arrays.hashCode(bytes) + Arrays.hashCode(endOffsets); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (!(object instanceof RangeMap)) { + return false; + } + RangeMap other = (RangeMap) object; + return numFields == other.numFields && Arrays.equals(endOffsets, other.endOffsets) + && Arrays.equals(bytes, other.bytes); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java new file mode 100644 index 0000000..b17c550 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.common.data.partition.range; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public class StaticFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory { + private static final long serialVersionUID = 1L; + private RangeMap rangeMap; + + public StaticFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories, + RangeMap rangeMap) { + super(rangeFields, comparatorFactories); + this.rangeMap = rangeMap; + } + + @Override + protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) { + return rangeMap; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java index 7d97507..8b57b15 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java @@ -20,7 +20,14 @@ package org.apache.hyracks.dataflow.std.base; import java.util.BitSet; +import org.apache.hyracks.api.comm.IPartitionCollector; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader; +import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader; +import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; public abstract class AbstractMToNConnectorDescriptor extends AbstractConnectorDescriptor { private static final long serialVersionUID = 1L; @@ -47,4 +54,15 @@ public abstract class AbstractMToNConnectorDescriptor extends AbstractConnectorD public boolean allProducersToAllConsumers() { return true; } + + @Override + public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index, + int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { + BitSet expectedPartitions = new BitSet(nProducerPartitions); + expectedPartitions.set(0, nProducerPartitions); + NonDeterministicChannelReader channelReader = + new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions); + NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader); + return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java index ab553f6..4c728ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java @@ -180,6 +180,7 @@ public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperat @Override public void fail() throws HyracksDataException { + // TODO: shouldn't we fail the MaterializerTaskState state? HyracksDataException hde = null; for (int i = 0; i < numberOfNonMaterializedOutputs; i++) { if (isOpen[i]) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java new file mode 100644 index 0000000..c437619 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.collectors; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hyracks.api.channels.IInputChannel; +import org.apache.hyracks.api.comm.IFrameReader; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.partitions.PartitionId; + +public class DeterministicPartitionBatchManager implements IPartitionBatchManager { + private final IFrameReader[] partitions; + private List<IFrameReader> partitionsList; + + public DeterministicPartitionBatchManager(int nSenders) { + this.partitions = new IFrameReader[nSenders]; + } + + @Override + public synchronized void addPartition(PartitionId partitionId, IInputChannel channel) { + InputChannelFrameReader channelReader = new InputChannelFrameReader(channel); + channel.registerMonitor(channelReader); + partitions[partitionId.getSenderIndex()] = channelReader; + if (allPartitionsAdded()) { + partitionsList = new ArrayList<>(Arrays.asList(partitions)); + notifyAll(); + } + } + + @Override + public synchronized void getNextBatch(List<IFrameReader> batch, int requestedSize) throws HyracksDataException { + while (!allPartitionsAdded()) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + if (partitionsList.isEmpty()) { + return; + } + if (requestedSize >= partitionsList.size()) { + batch.addAll(partitionsList); + partitionsList.clear(); + } else { + List<IFrameReader> subBatch = partitionsList.subList(0, requestedSize); + batch.addAll(subBatch); + subBatch.clear(); + } + } + + private synchronized boolean allPartitionsAdded() { + for (int i = 0; i < partitions.length; i++) { + if (partitions[i] == null) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java new file mode 100644 index 0000000..2646c94 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.collectors; + +import java.util.LinkedList; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameReader; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class SequentialMergeFrameReader implements IFrameReader { + private final int numSenders; + private final IPartitionBatchManager partitionBatchManager; + private final LinkedList<IFrameReader> senders; + private boolean isOpen; + + public SequentialMergeFrameReader(int numSenders, IPartitionBatchManager partitionBatchManager) { + this.numSenders = numSenders; + this.partitionBatchManager = partitionBatchManager; + this.senders = new LinkedList<>(); + this.isOpen = false; + } + + @Override + public void open() throws HyracksDataException { + if (!isOpen) { + isOpen = true; + // get all the senders and open them one by one + partitionBatchManager.getNextBatch(senders, numSenders); + for (IFrameReader sender : senders) { + sender.open(); + } + } + } + + @Override + public boolean nextFrame(IFrame outFrame) throws HyracksDataException { + IFrameReader currentSender; + while (!senders.isEmpty()) { + // process the sender at the beginning of the sequence + currentSender = senders.getFirst(); + outFrame.reset(); + if (currentSender.nextFrame(outFrame)) { + return true; + } else { + // done with the current sender, close it, remove it from the Q and process the next one in sequence + currentSender.close(); + senders.removeFirst(); + } + } + // done with all senders + return false; + } + + @Override + public void close() throws HyracksDataException { + for (IFrameReader sender : senders) { + sender.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java index 920fdb8..0b6e40e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java @@ -61,7 +61,7 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(), + return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(ctx), nConsumerPartitions, localityMap, index); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java index 092b5f1..32618ee 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java @@ -19,19 +19,14 @@ package org.apache.hyracks.dataflow.std.connectors; import java.nio.ByteBuffer; -import java.util.BitSet; import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.IPartitionCollector; import org.apache.hyracks.api.comm.IPartitionWriterFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor; -import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader; -import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader; -import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescriptor { @@ -123,15 +118,4 @@ public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescr } }; } - - @Override - public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index, - int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - BitSet expectedPartitions = new BitSet(nProducerPartitions); - expectedPartitions.set(0, nProducerPartitions); - NonDeterministicChannelReader channelReader = - new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions); - NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader); - return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader); - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java index 02fbedb..c11c08c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java @@ -18,10 +18,7 @@ */ package org.apache.hyracks.dataflow.std.connectors; -import java.util.BitSet; - import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.IPartitionCollector; import org.apache.hyracks.api.comm.IPartitionWriterFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; @@ -29,9 +26,6 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor; -import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader; -import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader; -import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor { private static final long serialVersionUID = 1L; @@ -46,18 +40,7 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDe public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner()); - } - - @Override - public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index, - int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - BitSet expectedPartitions = new BitSet(nProducerPartitions); - expectedPartitions.set(0, nProducerPartitions); - NonDeterministicChannelReader channelReader = - new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions); - NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader); - return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader); + return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx)); } public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java index 026ca5e..e0ec5d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java @@ -70,7 +70,7 @@ public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConn IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { final PartitionDataWriter hashWriter = - new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner()); + new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx)); return hashWriter; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java index f6996f1..af3ce06 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java @@ -45,6 +45,6 @@ public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitio IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, - tpcf.createPartitioner()); + tpcf.createPartitioner(ctx)); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java new file mode 100644 index 0000000..3decb69 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.connectors; + +import java.util.BitSet; + +import org.apache.hyracks.api.comm.IFrameReader; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.IPartitionCollector; +import org.apache.hyracks.api.comm.IPartitionWriterFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.apache.hyracks.dataflow.common.data.partition.OnePartitionComputerFactory; +import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor; +import org.apache.hyracks.dataflow.std.collectors.DeterministicPartitionBatchManager; +import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager; +import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; +import org.apache.hyracks.dataflow.std.collectors.SequentialMergeFrameReader; + +public class MToOneSequentialMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor { + private static final long serialVersionUID = 1L; + private final ITuplePartitionComputerFactory tpcf; + + public MToOneSequentialMergingConnectorDescriptor(IConnectorDescriptorRegistry spec) { + super(spec); + tpcf = new OnePartitionComputerFactory(); + } + + @Override + public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, + IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) + throws HyracksDataException { + // TODO(ali): create a single partition data writer instead + return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx)); + } + + @Override + public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index, + int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { + IPartitionBatchManager pbm = new DeterministicPartitionBatchManager(nProducerPartitions); + IFrameReader sequentialMergeReader = new SequentialMergeFrameReader(nProducerPartitions, pbm); + BitSet expectedPartitions = new BitSet(); + expectedPartitions.set(0, nProducerPartitions); + return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sequentialMergeReader, pbm); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java index 5e33275..d06d5d3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java @@ -113,6 +113,7 @@ public class PartitionDataWriter implements IFrameWriter { @Override public void open() throws HyracksDataException { + tpc.initialize(); for (int i = 0; i < pWriters.length; ++i) { isOpen[i] = true; pWriters[i].open(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java index dc250e6..034b054 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java @@ -190,7 +190,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition)); private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1); private final ITuplePartitionComputer hpcBuild = - new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(); + new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx); private final FrameTupleAppender appender = new FrameTupleAppender(); private final FrameTupleAppender ftappender = new FrameTupleAppender(); private IFrame[] bufferForPartitions; @@ -303,9 +303,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor } ITuplePartitionComputer hpc0 = - new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(); + new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx); ITuplePartitionComputer hpc1 = - new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(); + new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx); int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor); ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx); state.joiner = @@ -385,7 +385,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories); private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories); - private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner(); + private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner(ctx); private final FrameTupleAppender appender = new FrameTupleAppender(); private final FrameTupleAppender ftap = new FrameTupleAppender(); @@ -476,9 +476,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor state.joiner.releaseMemory(); } ITuplePartitionComputer hpcRep0 = - new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner(); + new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner(ctx); ITuplePartitionComputer hpcRep1 = - new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner(); + new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner(ctx); if (state.memoryForHashtable != memsize - 2) { for (int i = 0; i < state.nPartitions; i++) { ByteBuffer buf = bufferForPartitions[i].getBuffer(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java index 3873bae..a5c17f2 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java @@ -182,9 +182,9 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript @Override public void open() throws HyracksDataException { ITuplePartitionComputer hpc0 = - new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(); + new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx); ITuplePartitionComputer hpc1 = - new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(); + new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx); state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition)); ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);
