http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java new file mode 100644 index 0000000..1d8a55d --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/PushValueIntoDatascanRule.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.compiler.rewriter.rules; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.vxquery.compiler.rewriter.rules.util.ExpressionToolbox; +import org.apache.vxquery.functions.BuiltinFunctions; +import org.apache.vxquery.functions.BuiltinOperators; +import org.apache.vxquery.metadata.IVXQueryDataSource; +import org.apache.vxquery.metadata.VXQueryCollectionDataSource; + +/** + * The rule searches for an assign operator immediately following a data scan + * operator. + * + * <pre> + * Before + * + * plan__parent + * ASSIGN( $v2 : value( $v1, constant) ) + * DATASCAN( $source : $v1 ) + * plan__child + * + * Where $v1 is not used in plan__parent. + * + * After + * + * plan__parent + * ASSIGN( $v2 : $v1 ) + * DATASCAN( $source : $v1 ) + * plan__child + * + * $source is encoded with the value parameters. + * </pre> + */ + +public class PushValueIntoDatascanRule extends AbstractPushExpressionIntoDatascanRule { + + @Override + boolean updateDataSource(IVXQueryDataSource datasource, Mutable<ILogicalExpression> expression) { + VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) datasource; + boolean added = false; + List<Mutable<ILogicalExpression>> finds = new ArrayList<Mutable<ILogicalExpression>>(); + ILogicalExpression le = expression.getValue(); + if (le.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { + AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) le; + if (afce.getFunctionIdentifier().equals(BuiltinFunctions.FN_ZERO_OR_ONE_1.getFunctionIdentifier())) { + return false; + } + } + ExpressionToolbox.findAllFunctionExpressions(expression, BuiltinOperators.VALUE.getFunctionIdentifier(), finds); + + for (int i = finds.size(); i > 0; --i) { + Byte[] value = null; + List<ILogicalExpression> values = ExpressionToolbox.getFullArguments(finds.get(i - 1)); + if (values.size() > 1) { + value = ExpressionToolbox.getConstantArgument(finds.get(i - 1), 1); + ds.addValueSeq(value); + added = true; + } + } + + return added; + } + + @Override + LogicalOperatorTag getOperator() { + return LogicalOperatorTag.ASSIGN; + } + +}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java index 69940ad..d86de98 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/RemoveUnusedSortDistinctNodesRule.java @@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; @@ -94,12 +95,14 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule { @Override - public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { return false; } @Override - public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { + public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { boolean operatorChanged = false; // Do not process empty or nested tuple source. AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); @@ -151,7 +154,9 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule } } } - + if (operatorChanged) { + context.computeAndSetTypeEnvironmentForOperator(op); + } // Now with the new operator, update the variable mappings. cardinalityVariable = CardinalityRuleToolbox.updateCardinalityVariable(op, cardinalityVariable, vxqueryContext); updateVariableMap(op, cardinalityVariable, documentOrderVariables, uniqueNodesVariables, vxqueryContext); @@ -178,8 +183,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule return 0; } AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression; - if (!functionCall.getFunctionIdentifier().equals( - BuiltinOperators.SORT_DISTINCT_NODES_ASC_OR_ATOMICS.getFunctionIdentifier())) { + if (!functionCall.getFunctionIdentifier() + .equals(BuiltinOperators.SORT_DISTINCT_NODES_ASC_OR_ATOMICS.getFunctionIdentifier())) { return 0; } @@ -314,7 +319,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule * @param uniqueNodesVariables * @param uniqueNodes */ - private void resetUniqueNodesVariables(HashMap<Integer, UniqueNodes> uniqueNodesVariables, UniqueNodes uniqueNodes) { + private void resetUniqueNodesVariables(HashMap<Integer, UniqueNodes> uniqueNodesVariables, + UniqueNodes uniqueNodes) { for (Entry<Integer, UniqueNodes> entry : uniqueNodesVariables.entrySet()) { uniqueNodesVariables.put(entry.getKey(), uniqueNodes); } @@ -349,8 +355,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule case ASSIGN: AssignOperator assign = (AssignOperator) op; for (int index = 0; index < assign.getExpressions().size(); index++) { - ILogicalExpression assignLogicalExpression = (ILogicalExpression) assign.getExpressions() - .get(index).getValue(); + ILogicalExpression assignLogicalExpression = (ILogicalExpression) assign.getExpressions().get(index) + .getValue(); variableId = assign.getVariables().get(index).getId(); documentOrder = propagateDocumentOrder(assignLogicalExpression, documentOrderVariablesForOperator); uniqueNodes = propagateUniqueNodes(assignLogicalExpression, uniqueNodesVariablesForOperator); @@ -384,8 +390,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule // Find the last operator to set a variable and call this function again. SubplanOperator subplan = (SubplanOperator) op; for (int index = 0; index < subplan.getNestedPlans().size(); index++) { - AbstractLogicalOperator lastOperator = (AbstractLogicalOperator) subplan.getNestedPlans() - .get(index).getRoots().get(0).getValue(); + AbstractLogicalOperator lastOperator = (AbstractLogicalOperator) subplan.getNestedPlans().get(index) + .getRoots().get(0).getValue(); updateVariableMap(lastOperator, cardinalityVariable, documentOrderVariables, uniqueNodesVariables, vxqueryContext); } @@ -395,8 +401,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule UnnestOperator unnest = (UnnestOperator) op; ILogicalExpression unnestLogicalExpression = (ILogicalExpression) unnest.getExpressionRef().getValue(); variableId = unnest.getVariables().get(0).getId(); - Cardinality inputCardinality = vxqueryContext.getCardinalityOperatorMap(op.getInputs().get(0) - .getValue()); + Cardinality inputCardinality = vxqueryContext + .getCardinalityOperatorMap(op.getInputs().get(0).getValue()); documentOrder = propagateDocumentOrder(unnestLogicalExpression, documentOrderVariablesForOperator); uniqueNodes = propagateUniqueNodes(unnestLogicalExpression, uniqueNodesVariablesForOperator); @@ -425,10 +431,12 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule break; // The following operators do not change or add to the variable map. + case DATASOURCESCAN: case DISTRIBUTE_RESULT: case EMPTYTUPLESOURCE: case EXCHANGE: + case GROUP: case NESTEDTUPLESOURCE: case PROJECT: case SELECT: @@ -438,8 +446,8 @@ public class RemoveUnusedSortDistinctNodesRule implements IAlgebraicRewriteRule // The following operators' analysis has not yet been implemented. default: - throw new RuntimeException("Operator (" + op.getOperatorTag() - + ") has not been implemented in rewrite rule."); + throw new RuntimeException( + "Operator (" + op.getOperatorTag() + ") has not been implemented in rewrite rule."); } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java index 82be94c..3fb2696 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/SetVariableIdContextRule.java @@ -28,11 +28,13 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; /** * Set the default context for the variable id in the optimization context. + * * @author prestonc */ public class SetVariableIdContextRule implements IAlgebraicRewriteRule { @Override - public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { if (context.checkIfInDontApplySet(this, opRef.getValue())) { return false; } @@ -44,7 +46,9 @@ public class SetVariableIdContextRule implements IAlgebraicRewriteRule { case ASSIGN: case AGGREGATE: AbstractAssignOperator assign = (AbstractAssignOperator) op; - variableId = assign.getVariables().get(0).getId(); + if (assign.getVariables().size() > 0) { + variableId = assign.getVariables().get(0).getId(); + } break; case UNNEST: UnnestOperator unnest = (UnnestOperator) op; @@ -62,7 +66,8 @@ public class SetVariableIdContextRule implements IAlgebraicRewriteRule { } @Override - public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { return false; } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java index 0dd3b31..50bc07e 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/ExpressionToolbox.java @@ -16,8 +16,10 @@ */ package org.apache.vxquery.compiler.rewriter.rules.util; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.Mutable; import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue; import org.apache.vxquery.context.StaticContext; @@ -211,6 +213,27 @@ public class ExpressionToolbox { return pTypeCode.getInteger(); } + public static Byte[] getConstantArgument(Mutable<ILogicalExpression> searchM, int arg) { + AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue(); + ILogicalExpression argType = searchFunction.getArguments().get(arg).getValue(); + searchFunction.getArguments().size(); + if (argType.getExpressionTag() != LogicalExpressionTag.CONSTANT) { + return null; + } + TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + ExpressionToolbox.getConstantAsPointable((ConstantExpression) argType, tvp); + return ArrayUtils.toObject(tvp.getByteArray()); + } + + public static List<ILogicalExpression> getFullArguments(Mutable<ILogicalExpression> searchM) { + AbstractFunctionCallExpression searchFunction = (AbstractFunctionCallExpression) searchM.getValue(); + ArrayList<ILogicalExpression> args = new ArrayList<ILogicalExpression>(); + for (int i = 0; i < searchFunction.getArguments().size(); i++) { + args.add(searchFunction.getArguments().get(i).getValue()); + } + return args; + } + public static SequenceType getTypeExpressionTypeArgument(Mutable<ILogicalExpression> searchM, StaticContext dCtx) { int typeId = getTypeExpressionTypeArgument(searchM); if (typeId > 0) { http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java index 2c57c32..94040d2 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/util/OperatorToolbox.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.mutable.Mutable; - +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java b/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java index a3f7bf8..886229d 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/exceptions/SystemException.java @@ -51,6 +51,11 @@ public class SystemException extends HyracksDataException { super(message(code, loc)); this.code = code; } + + public SystemException(ErrorCode code, SourceLocation loc, Throwable cause) { + super(message(code, loc), cause); + this.code = code; + } public ErrorCode getCode() { return code; http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml index 0b03c34..ec27864 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml +++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml @@ -660,6 +660,7 @@ <param name="expr" type="json-item()"/> <return type="xs:string*"/> <runtime type="scalar" class="org.apache.vxquery.runtime.functions.json.KeysOrMembersScalarEvaluatorFactory"/> + <runtime type="unnesting" class="org.apache.vxquery.runtime.functions.json.KeysOrMembersUnnestingEvaluatorFactory"/> </operator> <!-- opext:subtract($arg1 as xs:anyAtomicType?, $arg2 as xs:anyAtomicType?) as xs:anyAtomicType? --> http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java index ad8db4e..edf8dbc 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/jsonparser/JSONParser.java @@ -1,43 +1,54 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* http://www.apache.org/licenses/LICENSE-2.0 +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.vxquery.jsonparser; +import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.IOException; import java.io.Reader; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.htrace.fasterxml.jackson.core.JsonFactory; +import org.apache.htrace.fasterxml.jackson.core.JsonParser; +import org.apache.htrace.fasterxml.jackson.core.JsonToken; +import org.apache.hyracks.api.comm.IFrameFieldAppender; +import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.primitive.BooleanPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.datamodel.builders.atomic.StringValueBuilder; import org.apache.vxquery.datamodel.builders.jsonitem.ArrayBuilder; import org.apache.vxquery.datamodel.builders.jsonitem.ObjectBuilder; import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder; import org.apache.vxquery.datamodel.values.ValueTag; +import org.apache.vxquery.datamodel.values.XDMConstants; import org.apache.vxquery.xmlparser.IParser; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - public class JSONParser implements IParser { - final JsonFactory factory; + final List<Byte[]> valueSeq; protected final ArrayBackedValueStorage atomic; + private TaggedValuePointable tvp; + private BooleanPointable bp; protected final List<ArrayBuilder> abStack; protected final List<ObjectBuilder> obStack; protected final List<ArrayBackedValueStorage> abvsStack; @@ -48,6 +59,16 @@ public class JSONParser implements IParser { protected final DataOutput out; protected itemType checkItem; protected int levelArray, levelObject; + protected final List<Byte[]> allKeys; + protected ByteArrayOutputStream outputStream, prefixStream, pathStream; + protected int objectMatchLevel; + protected int arrayMatchLevel; + protected boolean matched, literal; + protected ArrayBackedValueStorage tempABVS; + protected List<Integer> arrayCounters; + protected List<Boolean> keysOrMembers; + protected IFrameWriter writer; + protected IFrameFieldAppender appender; enum itemType { ARRAY, @@ -57,8 +78,14 @@ public class JSONParser implements IParser { protected final List<itemType> itemStack; public JSONParser() { + this(null); + } + + public JSONParser(List<Byte[]> valueSeq) { factory = new JsonFactory(); + this.valueSeq = valueSeq; atomic = new ArrayBackedValueStorage(); + tvp = new TaggedValuePointable(); abStack = new ArrayList<ArrayBuilder>(); obStack = new ArrayList<ObjectBuilder>(); abvsStack = new ArrayList<ArrayBackedValueStorage>(); @@ -67,9 +94,59 @@ public class JSONParser implements IParser { itemStack = new ArrayList<itemType>(); svb = new StringValueBuilder(); sb = new SequenceBuilder(); + bp = new BooleanPointable(); + allKeys = new ArrayList<Byte[]>(); abvsStack.add(atomic); out = abvsStack.get(abvsStack.size() - 1).getDataOutput(); + tempABVS = new ArrayBackedValueStorage(); + this.objectMatchLevel = 1; + this.arrayMatchLevel = 0; + matched = false; + literal = false; + arrayCounters = new ArrayList<Integer>(); + outputStream = new ByteArrayOutputStream(); + prefixStream = new ByteArrayOutputStream(); + pathStream = new ByteArrayOutputStream(); + this.keysOrMembers = new ArrayList<Boolean>(); + outputStream.reset(); + pathStream.reset(); + if (valueSeq != null) { + for (int i = 0; i < this.valueSeq.size(); i++) { + tvp.set(ArrayUtils.toPrimitive(valueSeq.get(i)), 0, ArrayUtils.toPrimitive(valueSeq.get(i)).length); + //access an item of an array + if (tvp.getTag() == ValueTag.XS_INTEGER_TAG) { + pathStream.write(tvp.getByteArray(), 0, tvp.getLength()); + this.arrayMatchLevel++; + this.keysOrMembers.add(Boolean.valueOf(true)); + //access all the items of an array or + //all the keys of an object + } else if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) { + pathStream.write(tvp.getByteArray(), 0, tvp.getLength()); + this.arrayMatchLevel++; + this.keysOrMembers.add(Boolean.valueOf(false)); + //access an object + } else { + pathStream.write(tvp.getByteArray(), 1, tvp.getLength() - 1); + } + } + } + } + Byte[] toBytes(Integer v) { + Byte[] barr = ArrayUtils.toObject(ByteBuffer.allocate(9).putLong(1, v).array()); + barr[0] = ValueTag.XS_INTEGER_TAG; + return barr; + } + + public int parse(Reader input, ArrayBackedValueStorage result, IFrameWriter writer, IFrameFieldAppender appender) + throws HyracksDataException { + this.writer = writer; + this.appender = appender; + if (this.valueSeq != null) { + return parseElements(input, result); + } else { + return parse(input, result); + } } public int parse(Reader input, ArrayBackedValueStorage result) throws HyracksDataException { @@ -79,7 +156,6 @@ public class JSONParser implements IParser { JsonParser parser = factory.createParser(input); JsonToken token = parser.nextToken(); checkItem = null; - levelArray = 0; levelObject = 0; sb.reset(result); @@ -89,47 +165,22 @@ public class JSONParser implements IParser { } switch (token) { case START_ARRAY: - levelArray++; - if (levelArray > abStack.size()) { - abStack.add(new ArrayBuilder()); - } - if (levelArray + levelObject > abvsStack.size() - 1) { - abvsStack.add(new ArrayBackedValueStorage()); - } - itemStack.add(itemType.ARRAY); - abvsStack.get(levelArray + levelObject).reset(); - abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject)); + startArray(); break; case START_OBJECT: - levelObject++; - if (levelObject > obStack.size()) { - obStack.add(new ObjectBuilder()); - } - if (levelArray + levelObject > abvsStack.size() - 1) { - abvsStack.add(new ArrayBackedValueStorage()); - } - itemStack.add(itemType.OBJECT); - abvsStack.get(levelArray + levelObject).reset(); - obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject)); + startObject(); break; case FIELD_NAME: - if (levelObject > spStack.size()) { - keyStack.add(new ArrayBackedValueStorage()); - spStack.add(new UTF8StringPointable()); - } - keyStack.get(levelObject - 1).reset(); - DataOutput outk = keyStack.get(levelObject - 1).getDataOutput(); - svb.write(parser.getText(), outk); - spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1)); + startFieldName(parser); break; case VALUE_NUMBER_INT: - atomicValues(ValueTag.XS_INTEGER_TAG, parser, out, svb, levelArray, levelObject); + startAtomicValues(ValueTag.XS_INTEGER_TAG, parser); break; case VALUE_STRING: - atomicValues(ValueTag.XS_STRING_TAG, parser, out, svb, levelArray, levelObject); + startAtomicValues(ValueTag.XS_STRING_TAG, parser); break; case VALUE_NUMBER_FLOAT: - atomicValues(ValueTag.XS_DOUBLE_TAG, parser, out, svb, levelArray, levelObject); + startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser); break; case END_ARRAY: abStack.get(levelArray - 1).finish(); @@ -173,11 +224,160 @@ public class JSONParser implements IParser { sb.finish(); outResult.write(result.getByteArray()); } catch (Exception e) { - throw new HyracksDataException(e.toString()); + throw new HyracksDataException("Accessing or writing in out of bounds space", e); + } + return items; + } + + public int parseElements(Reader input, ArrayBackedValueStorage result) throws HyracksDataException { + int items = 0; + try { + JsonParser parser = factory.createParser(input); + JsonToken token = parser.nextToken(); + checkItem = null; + + this.objectMatchLevel = 0; + this.matched = false; + + levelArray = 0; + levelObject = 0; + sb.reset(result); + while (token != null) { + if (itemStack.size() > 1) { + checkItem = itemStack.get(itemStack.size() - 2); + } + switch (token) { + case START_ARRAY: + startArray(); + break; + case START_OBJECT: + startObject(); + break; + case FIELD_NAME: + startFieldName(parser); + break; + case VALUE_NUMBER_INT: + startAtomicValues(ValueTag.XS_INTEGER_TAG, parser); + break; + case VALUE_STRING: + startAtomicValues(ValueTag.XS_STRING_TAG, parser); + break; + case VALUE_NUMBER_FLOAT: + startAtomicValues(ValueTag.XS_DOUBLE_TAG, parser); + break; + case END_ARRAY: + //if the query doesn't ask for an atomic value + if (!this.literal && this.pathMatch()) { + //check if the path asked from the query includes the current path + abStack.get(levelArray - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.ARRAY) { + if (levelArray > this.arrayMatchLevel + 1) { + abStack.get(levelArray - 2).addItem(abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + this.matched = false; + items++; + writeElement(abvsStack.get(levelArray + levelObject)); + } + } else if (checkItem == itemType.OBJECT) { + if (levelArray > this.arrayMatchLevel && !this.matched) { + obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), + abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + writeElement(abvsStack.get(levelArray + levelObject)); + this.matched = false; + items++; + } + } + } + } + if (allKeys.size() - 1 >= 0) { + allKeys.remove(allKeys.size() - 1); + } + this.arrayCounters.remove(levelArray - 1); + itemStack.remove(itemStack.size() - 1); + levelArray--; + break; + case END_OBJECT: + //if the query doesn't ask for an atomic value + if (!this.literal && this.pathMatch()) { + //check if the path asked from the query includes the current path + obStack.get(levelObject - 1).finish(); + if (itemStack.size() > 1) { + if (checkItem == itemType.OBJECT) { + if (levelObject > this.objectMatchLevel) { + obStack.get(levelObject - 2).addItem(spStack.get(levelObject - 2), + abvsStack.get(levelArray + levelObject)); + } else if (this.matched) { + this.matched = false; + items++; + writeElement(abvsStack.get(levelArray + levelObject)); + } + } else if (checkItem == itemType.ARRAY) { + abStack.get(levelArray - 1).addItem(abvsStack.get(levelArray + levelObject)); + if (this.matched) { + writeElement(abvsStack.get(levelArray + levelObject)); + this.matched = false; + } + } + } + } + if (allKeys.size() - 1 >= 0) { + allKeys.remove(allKeys.size() - 1); + } + itemStack.remove(itemStack.size() - 1); + levelObject--; + break; + default: + break; + } + token = parser.nextToken(); + } + sb.finish(); + } catch (Exception e) { + throw new HyracksDataException("Accessing or writing in out of bounds space", e); } return items; } + private boolean pathMatch() { + outputStream.reset(); + for (Byte[] bb : allKeys) { + outputStream.write(ArrayUtils.toPrimitive(bb), 0, ArrayUtils.toPrimitive(bb).length); + } + //the path of values created by parsing the file + boolean contains = false; + this.matched = false; + prefixStream.reset(); + if (pathStream.size() < outputStream.size()) { + prefixStream.write(outputStream.toByteArray(), 0, pathStream.size()); + contains = Arrays.equals(prefixStream.toByteArray(), pathStream.toByteArray()); + } else { + prefixStream.write(pathStream.toByteArray(), 0, outputStream.size()); + contains = Arrays.equals(prefixStream.toByteArray(), outputStream.toByteArray()); + } + if (pathStream.size() == outputStream.size() && contains) { + this.objectMatchLevel = this.levelObject; + this.matched = true; + this.literal = false; + } + return contains; + } + + public void itemsInArray() { + if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY && !this.arrayCounters.isEmpty()) { + boolean addCounter = levelArray - 1 < this.keysOrMembers.size() ? this.keysOrMembers.get(levelArray - 1) + : true; + if (addCounter) { + this.arrayCounters.set(levelArray - 1, this.arrayCounters.get(levelArray - 1) + 1); + this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - 1))); + } else { + Byte[] bool = { (byte) 0x2B, 0x01 }; + this.allKeys.add(bool); + } + } + } + public void atomicValues(int tag, JsonParser parser, DataOutput out, StringValueBuilder svb, int levelArray, int levelObject) throws IOException { abvsStack.get(0).reset(); @@ -189,12 +389,141 @@ public class JSONParser implements IParser { } else if (tag == ValueTag.XS_INTEGER_TAG) { out.writeLong(parser.getLongValue()); } - if (itemStack.size() != 0) { + if (!itemStack.isEmpty()) { if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { abStack.get(levelArray - 1).addItem(abvsStack.get(0)); + if (valueSeq != null && this.matched && levelArray == this.arrayMatchLevel) { + this.literal = true; + this.matched = false; + writeElement(abvsStack.get(0)); + } } else if (itemStack.get(itemStack.size() - 1) == itemType.OBJECT) { obStack.get(levelObject - 1).addItem(spStack.get(levelObject - 1), abvsStack.get(0)); + if (valueSeq != null && this.matched && levelObject == this.objectMatchLevel) { + this.literal = true; + this.matched = false; + writeElement(abvsStack.get(0)); + } + } + } + } + + public void writeElement(ArrayBackedValueStorage abvs) throws IOException { + tempABVS.reset(); + DataOutput out = tempABVS.getDataOutput(); + out.write(abvs.getByteArray(), abvs.getStartOffset(), abvs.getLength()); + FrameUtils.appendFieldToWriter(writer, appender, tempABVS.getByteArray(), tempABVS.getStartOffset(), + tempABVS.getLength()); + } + + public void startArrayOrObjects(int count) { + if (valueSeq != null && !this.arrayCounters.isEmpty()) { + boolean addCounter = levelArray - count < this.keysOrMembers.size() + ? this.keysOrMembers.get(levelArray - count) : true; + if (itemStack.get(itemStack.size() - 1) == itemType.ARRAY) { + if (addCounter) { + this.arrayCounters.set(levelArray - count, this.arrayCounters.get(levelArray - count) + 1); + this.allKeys.add(this.toBytes(this.arrayCounters.get(levelArray - count))); + } else { + XDMConstants.setTrue(bp); + this.allKeys.add(ArrayUtils.toObject(bp.getByteArray())); + } + } + + } + if (count == 2 && valueSeq != null) { + this.arrayCounters.add(Integer.valueOf(0)); + } + } + + public void startArray() throws HyracksDataException { + levelArray++; + if (levelArray > abStack.size()) { + abStack.add(new ArrayBuilder()); + } + if (levelArray + levelObject > abvsStack.size() - 1) { + abvsStack.add(new ArrayBackedValueStorage()); + } + startArrayOrObjects(2); + itemStack.add(itemType.ARRAY); + if (this.pathMatch() || this.valueSeq == null) { + abvsStack.get(levelArray + levelObject).reset(); + try { + abStack.get(levelArray - 1).reset(abvsStack.get(levelArray + levelObject)); + } catch (Exception e) { + throw new HyracksDataException("Accessing index out of bounds", e); + } + } + } + + public void startObject() throws HyracksDataException { + levelObject++; + if (levelObject > obStack.size()) { + obStack.add(new ObjectBuilder()); + } + if (levelArray + levelObject > abvsStack.size() - 1) { + abvsStack.add(new ArrayBackedValueStorage()); + } + startArrayOrObjects(1); + itemStack.add(itemType.OBJECT); + if (this.pathMatch() || this.valueSeq == null) { + abvsStack.get(levelArray + levelObject).reset(); + try { + obStack.get(levelObject - 1).reset(abvsStack.get(levelArray + levelObject)); + } catch (Exception e) { + throw new HyracksDataException("Accessing index out of bounds", e); } } } + + public void startFieldName(JsonParser parser) throws HyracksDataException { + if (levelObject > spStack.size()) { + keyStack.add(new ArrayBackedValueStorage()); + spStack.add(new UTF8StringPointable()); + } + keyStack.get(levelObject - 1).reset(); + DataOutput outk = keyStack.get(levelObject - 1).getDataOutput(); + try { + svb.write(parser.getText(), outk); + spStack.get(levelObject - 1).set(keyStack.get(levelObject - 1)); + if (this.valueSeq != null) { + int length = 0; + byte[] barr = spStack.get(levelObject - 1).getByteArray(); + outputStream.reset(); + outputStream.write(barr, 0, spStack.get(levelObject - 1).getLength()); + allKeys.add(ArrayUtils.toObject(outputStream.toByteArray())); + for (int i = 0; i < allKeys.size() - 1; i++) { + tvp.set(ArrayUtils.toPrimitive(allKeys.get(i)), 0, ArrayUtils.toPrimitive(allKeys.get(i)).length); + length += ArrayUtils.toPrimitive(allKeys.get(i)).length; + } + //if the next two bytes represent a boolean (boolean has only two bytes), + //it means that query asks for all the keys of the object + if (length <= pathStream.size() && (length + 2) <= pathStream.size()) { + tvp.set(pathStream.toByteArray(), length, length + 2); + if (tvp.getTag() == ValueTag.XS_BOOLEAN_TAG) { + abvsStack.get(0).reset(); + out.write(ValueTag.XS_STRING_TAG); + svb.write(parser.getText(), out); + writeElement(abvsStack.get(0)); + } + } + } + } catch (Exception e) { + throw new HyracksDataException("Writing in out of bounds space", e); + } + } + + public void startAtomicValues(int tag, JsonParser parser) throws HyracksDataException { + itemsInArray(); + if (this.pathMatch() || this.valueSeq == null) { + try { + atomicValues(tag, parser, out, svb, levelArray, levelObject); + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + if (allKeys.size() - 1 >= 0) { + allKeys.remove(allKeys.size() - 1); + } + } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java index df6fb4b..2459944 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/AbstractVXQueryDataSource.java @@ -18,37 +18,102 @@ package org.apache.vxquery.metadata; import java.util.List; -import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; +import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; -public abstract class AbstractVXQueryDataSource implements IDataSource<String> { +public abstract class AbstractVXQueryDataSource implements IVXQueryDataSource { protected static final String DELIMITER = "\\|"; protected int dataSourceId; protected String collectionName; protected String[] collectionPartitions; - protected String elementPath; + protected List<Integer> childSeq; + protected List<Byte[]> valueSeq; protected int totalDataSources; protected String tag; - protected String function; protected Object[] types; protected IDataSourcePropertiesProvider propProvider; - - public abstract String getFunctionCall(); + + @Override + public INodeDomain getDomain() { + return null; + } @Override public boolean isScanAccessPathALeaf() { - // TODO Auto-generated method stub return false; } + public int getTotalDataSources() { + return totalDataSources; + } + + public void setTotalDataSources(int totalDataSources) { + this.totalDataSources = totalDataSources; + } + + public int getDataSourceId() { + return dataSourceId; + } + + public int getPartitionCount() { + return collectionPartitions.length; + } + + public String getTag() { + return this.tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + @Override - public INodeDomain getDomain() { - // TODO Auto-generated method stub - return null; + public String getId() { + return collectionName; + } + + @Override + public Object[] getSchemaTypes() { + return types; + } + + @Override + public IDataSourcePropertiesProvider getPropertiesProvider() { + return propProvider; + } + + @Override + public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) { + } + + public void addChildSeq(int integer) { + childSeq.add(integer); + } + + public List<Integer> getChildSeq() { + return childSeq; + } + + public void addValueSeq(Byte[] value) { + valueSeq.add(value); + } + + public List<Byte[]> getValueSeq() { + return valueSeq; + } + + public String[] getPartitions() { + return collectionPartitions; + } + + public void setPartitions(String[] collectionPartitions) { + this.collectionPartitions = collectionPartitions; } + abstract public boolean usingIndex(); } http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java new file mode 100644 index 0000000..8e71339 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/IVXQueryDataSource.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.metadata; + +import java.util.List; + +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; + +public interface IVXQueryDataSource extends IDataSource<String> { + boolean usingIndex(); + + void addChildSeq(int integer); + + List<Integer> getChildSeq(); +} + http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java index bee7c7b..c5761c5 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; -import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; @@ -29,95 +28,38 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain; public class VXQueryCollectionDataSource extends AbstractVXQueryDataSource { - - private VXQueryCollectionDataSource(int id, String file, Object[] types) { + private VXQueryCollectionDataSource(int id, String collection, Object[] types) { this.dataSourceId = id; - this.collectionName = file; - collectionPartitions = collectionName.split(DELIMITER); + this.collectionName = collection; + this.collectionPartitions = collectionName.split(DELIMITER); this.types = types; + final IPhysicalPropertiesVector vec = new StructuralPropertiesVector( new RandomPartitioningProperty(new CollectionFileDomain(collectionName)), new ArrayList<ILocalStructuralProperty>()); - propProvider = new IDataSourcePropertiesProvider() { + this.propProvider = new IDataSourcePropertiesProvider() { @Override public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) { return vec; } }; - this.childSeq = new ArrayList<>(); this.tag = null; + this.childSeq = new ArrayList<>(); + this.valueSeq = new ArrayList<>(); } - public static VXQueryCollectionDataSource create(int id, String file, Object type) { - return new VXQueryCollectionDataSource(id, file, new Object[] { type }); - } - - public int getTotalDataSources() { - return totalDataSources; - } - - public void setTotalDataSources(int totalDataSources) { - this.totalDataSources = totalDataSources; - } - - public int getDataSourceId() { - return dataSourceId; - } - - public String[] getPartitions() { - return collectionPartitions; - } - - public void setPartitions(String[] collectionPartitions) { - this.collectionPartitions = collectionPartitions; - } - - public int getPartitionCount() { - return collectionPartitions.length; - } - - public String getTag() { - return this.tag; - } - - public void setTag(String tag) { - this.tag = tag; - } - - @Override - public String getId() { - return collectionName; - } - - @Override - public Object[] getSchemaTypes() { - return types; - } - - @Override - public IDataSourcePropertiesProvider getPropertiesProvider() { - return propProvider; - } - - @Override - public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) { - } - - public void addChildSeq(int integer) { - childSeq.add(integer); - } - - public List<Integer> getChildSeq() { - return childSeq; + public static VXQueryCollectionDataSource create(int id, String collection, Object type) { + return new VXQueryCollectionDataSource(id, collection, new Object[] { type }); } @Override public String toString() { - return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq + "]"; + return "VXQueryCollectionDataSource [collectionName=" + collectionName + ", childSeq=" + childSeq + + ", valueSeq=" + valueSeq + "]"; } - @Override - public String getFunctionCall() { - return function; + public boolean usingIndex() { + return false; } + } http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index be95f93..623b48c 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -77,6 +77,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private short totalDataSources; private String[] collectionPartitions; private List<Integer> childSeq; + private List<Byte[]> valueSeq; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); private HDFSFunctions hdfs; private String tag; @@ -84,13 +85,14 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private final String hdfsConf; private final Map<String, NodeControllerInfo> nodeControllerInfos; - public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, + public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, AbstractVXQueryDataSource ds, RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) { super(spec, 1, 1); collectionPartitions = ds.getPartitions(); dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); + valueSeq = ds.getValueSeq(); recordDescriptors[0] = rDesc; this.tag = ds.getTag(); this.hdfsConf = hdfsConf; @@ -113,7 +115,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO final String collectionName = collectionPartitions[partition % collectionPartitions.length]; final XMLParser parser = new XMLParser(false, nodeIdProvider, nodeId, appender, childSeq, dCtx.getStaticContext()); - final JSONParser jparser = new JSONParser(); + final JSONParser jparser = new JSONParser(valueSeq); return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { @Override @@ -130,7 +132,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO Reader input; if (!collectionModifiedName.contains("hdfs:/")) { File collectionDirectory = new File(collectionModifiedName); - //check if directory is in the local file system + // check if directory is in the local file system if (collectionDirectory.exists()) { // Go through each tuple. if (collectionDirectory.isDirectory()) { @@ -152,9 +154,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO try { jsonAbvs.reset(); input = new InputStreamReader(new FileInputStream(file)); - jparser.parse(input, jsonAbvs); - FrameUtils.appendFieldToWriter(writer, appender, jsonAbvs.getByteArray(), - jsonAbvs.getStartOffset(), jsonAbvs.getLength()); + jparser.parse(input, jsonAbvs, writer, appender); } catch (FileNotFoundException e) { throw new HyracksDataException(e.toString()); } @@ -197,14 +197,14 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO RecordReader reader; TaskAttemptContext context; for (int i = 0; i < size; i++) { - //read split + // read split context = ctxFactory.createContext(job.getConfiguration(), i); - reader = inputFormat.createRecordReader(inputSplits.get(i), context); reader.initialize(inputSplits.get(i), context); while (reader.nextKeyValue()) { value = reader.getCurrentValue().toString(); - //Split value if it contains more than one item with the tag + // Split value if it contains more than + // one item with the tag if (StringUtils.countMatches(value, tag) > 1) { String[] items = value.split(tag); for (String item : items) { @@ -218,7 +218,9 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO } } else { value = START_TAG + value; - //create an input stream to the file currently reading and send it to parser + // create an input stream to the + // file currently reading and send + // it to parser stream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)); parser.parseHDFSElements(stream, writer, fta, i); stream.close(); @@ -232,10 +234,10 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO } } else { try { - //check if the path exists and is a directory + // check if the path exists and is a directory if (fs.exists(directory) && fs.isDirectory(directory)) { for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - //read every file in the directory + // read every file in the directory RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); while (it.hasNext()) { xmlDocument = it.next().getPath(); @@ -244,7 +246,9 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO LOGGER.fine( "Starting to read XML document: " + xmlDocument.getName()); } - //create an input stream to the file currently reading and send it to parser + // create an input stream to the + // file currently reading and + // send it to parser InputStream in = fs.open(xmlDocument).getWrappedStream(); parser.parseHDFSElements(in, writer, fta, tupleIndex); in.close(); http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java index ddbc984..ea69cfd 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingDataSource.java @@ -31,9 +31,9 @@ import org.apache.vxquery.compiler.rewriter.rules.CollectionFileDomain; */ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource { - protected Object[] types; + private String elementPath; + private String function; - protected IDataSourcePropertiesProvider propProvider; private VXQueryIndexingDataSource(int id, String collection, String elementPath, Object[] types, String functionCall) { this.dataSourceId = id; @@ -41,12 +41,11 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource { this.elementPath = elementPath; this.function = functionCall; this.collectionPartitions = collectionName.split(DELIMITER); - this.types = types; + final IPhysicalPropertiesVector vec = new StructuralPropertiesVector( - new RandomPartitioningProperty(new CollectionFileDomain(collectionName)), - new ArrayList<>()); - propProvider = new IDataSourcePropertiesProvider() { + new RandomPartitioningProperty(new CollectionFileDomain(collectionName)), new ArrayList<>()); + this.propProvider = new IDataSourcePropertiesProvider() { @Override public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) { return vec; @@ -54,83 +53,30 @@ public class VXQueryIndexingDataSource extends AbstractVXQueryDataSource { }; this.tag = null; this.childSeq = new ArrayList<>(); + this.valueSeq = new ArrayList<>(); } - public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type, String - function) { + public static VXQueryIndexingDataSource create(int id, String collection, String index, Object type, + String function) { return new VXQueryIndexingDataSource(id, collection, index, new Object[] { type }, function); } - public int getTotalDataSources() { - return totalDataSources; - } - - public void setTotalDataSources(int totalDataSources) { - this.totalDataSources = totalDataSources; - } - - public int getDataSourceId() { - return dataSourceId; - } - public String getElementPath() { return elementPath; } - public String[] getCollectionPartitions() { - return collectionPartitions; - } - - public void setCollectionPartitions(String[] collectionPartitions) { - this.collectionPartitions = collectionPartitions; - } - - public int getPartitionCount() { - return collectionPartitions.length; - } - - public String getTag() { - return this.tag; - } - - public void setTag(String tag) { - this.tag = tag; - } - - @Override - public String getId() { - return collectionName; - } - - @Override - public Object[] getSchemaTypes() { - return types; - } - - @Override - public IDataSourcePropertiesProvider getPropertiesProvider() { - return propProvider; - } - - @Override - public void computeFDs(List scanVariables, List fdList) { + public String getFunctionCall() { + return function; } @Override public String toString() { - return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath + " " - + "function=" + function + "]"; + return "VXQueryIndexingDataSource [collectionName=" + collectionName + ", elementPath=" + elementPath + + ", function=" + function + "]"; } - @Override - public String getFunctionCall() { - return function; - } - - public List<Integer> getChildSeq() { - return childSeq; + public boolean usingIndex() { + return true; } } - - http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java index a24a629..ac92a0e 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryIndexingOperatorDescriptor.java @@ -68,7 +68,7 @@ public class VXQueryIndexingOperatorDescriptor extends AbstractSingleActivityOpe RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) { super(spec, 1, 1); this.functionCall = ds.getFunctionCall(); - collectionPartitions = ds.getCollectionPartitions(); + collectionPartitions = ds.getPartitions(); dataSourceId = (short) ds.getDataSourceId(); totalDataSources = (short) ds.getTotalDataSources(); recordDescriptors[0] = rDesc; http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java index e552f68..f6644d6 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java @@ -88,43 +88,32 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig) - throws AlgebricksException { - VXQueryCollectionDataSource ds = null; - VXQueryIndexingDataSource ids = null; - - try { - ids = (VXQueryIndexingDataSource) dataSource; - } catch (ClassCastException e) { - ds = (VXQueryCollectionDataSource) dataSource; - } + throws AlgebricksException { + AbstractVXQueryDataSource ds = (AbstractVXQueryDataSource) dataSource; if (sourceFileMap != null) { - final int len = ds != null ? ds.getPartitions().length : ids.getCollectionPartitions().length; + final int len = ds.getPartitions().length; String[] collectionPartitions = new String[len]; for (int i = 0; i < len; ++i) { - String partition = ds != null ? ds.getPartitions()[i] : ids.getCollectionPartitions()[i]; + String partition = ds.getPartitions()[i]; File mapped = sourceFileMap.get(partition); collectionPartitions[i] = mapped != null ? mapped.toString() : partition; } - if (ds != null) { - ds.setPartitions(collectionPartitions); - } else { - ids.setCollectionPartitions(collectionPartitions); - } + ds.setPartitions(collectionPartitions); } RecordDescriptor rDesc; IOperatorDescriptor scanner; AlgebricksPartitionConstraint constraint; - if (ds != null) { + if (!ds.usingIndex()) { rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf, this.nodeControllerInfos); constraint = getClusterLocations(nodeList, ds.getPartitionCount()); } else { rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]); - scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, ids, rDesc, this.hdfsConf, - this.nodeControllerInfos); - constraint = getClusterLocations(nodeList, ids.getPartitionCount()); + scanner = new VXQueryIndexingOperatorDescriptor(jobSpec, (VXQueryIndexingDataSource) ds, rDesc, + this.hdfsConf, this.nodeControllerInfos); + constraint = getClusterLocations(nodeList, ds.getPartitionCount()); } return new Pair<>(scanner, constraint); @@ -245,7 +234,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException { throw new UnsupportedOperationException(); } - + @Override public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys, @@ -254,7 +243,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String JobSpecification jobSpec, boolean bulkload) throws AlgebricksException { throw new UnsupportedOperationException(); } - + @Override public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys, @@ -263,7 +252,7 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String JobSpecification jobSpec) throws AlgebricksException { throw new UnsupportedOperationException(); } - + @Override public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime( IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema, @@ -274,10 +263,11 @@ public class VXQueryMetadataProvider implements IMetadataProvider<String, String JobSpecification spec) throws AlgebricksException { throw new UnsupportedOperationException(); } - + @Override public Map<String, String> getConfig() { return new HashMap<>(); } + } http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java index 4117774..a8be359 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/base/AbstractTaggedValueArgumentUnnestingEvaluator.java @@ -16,13 +16,17 @@ */ package org.apache.vxquery.runtime.functions.base; +import org.apache.vxquery.datamodel.accessors.ArrayBackedValueStoragePool; +import org.apache.vxquery.datamodel.accessors.PointablePool; +import org.apache.vxquery.datamodel.accessors.PointablePoolFactory; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.exceptions.SystemException; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -import org.apache.vxquery.datamodel.accessors.PointablePool; -import org.apache.vxquery.datamodel.accessors.PointablePoolFactory; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; public abstract class AbstractTaggedValueArgumentUnnestingEvaluator implements IUnnestingEvaluator { private final IScalarEvaluator[] args; @@ -30,6 +34,7 @@ public abstract class AbstractTaggedValueArgumentUnnestingEvaluator implements I protected final TaggedValuePointable[] tvps; protected final PointablePool ppool = PointablePoolFactory.INSTANCE.createPointablePool(); + protected final ArrayBackedValueStoragePool abvsPool = new ArrayBackedValueStoragePool(); public AbstractTaggedValueArgumentUnnestingEvaluator(IScalarEvaluator[] args) { this.args = args; http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java index d255345..b127d2a 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersScalarEvaluator.java @@ -21,10 +21,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; -import org.apache.vxquery.datamodel.accessors.jsonitem.ArrayPointable; -import org.apache.vxquery.datamodel.accessors.jsonitem.ObjectPointable; import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder; -import org.apache.vxquery.datamodel.values.ValueTag; import org.apache.vxquery.exceptions.ErrorCode; import org.apache.vxquery.exceptions.SystemException; import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentScalarEvaluator; @@ -33,42 +30,32 @@ import java.io.IOException; public class KeysOrMembersScalarEvaluator extends AbstractTaggedValueArgumentScalarEvaluator { protected final IHyracksTaskContext ctx; - private final ObjectPointable op; - private final ArrayPointable ap; + private ArrayBackedValueStorage abvs; private final SequenceBuilder sb; private final TaggedValuePointable tempTvp; + private final KeysOrMembersUnnesting keysOrMembers; public KeysOrMembersScalarEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) { super(args); this.ctx = ctx; - op = (ObjectPointable) ObjectPointable.FACTORY.createPointable(); - ap = (ArrayPointable) ArrayPointable.FACTORY.createPointable(); + abvs = new ArrayBackedValueStorage(); sb = new SequenceBuilder(); tempTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + keysOrMembers = new KeysOrMembersUnnesting(ctx, ppool); } @Override protected void evaluate(TaggedValuePointable[] args, IPointable result) throws SystemException { - final TaggedValuePointable tvp = args[0]; - ArrayBackedValueStorage abvs = abvsPool.takeOne(); + abvs = abvsPool.takeOne(); + keysOrMembers.init(args); try { - switch (tvp.getTag()) { - case ValueTag.OBJECT_TAG: - tvp.getValue(op); - op.getKeys(abvs); - result.set(abvs); - break; - case ValueTag.ARRAY_TAG: - abvs.reset(); - sb.reset(abvs); - tvp.getValue(ap); - ap.appendItems(sb); - sb.finish(); - result.set(abvs); - break; - default: - throw new SystemException(ErrorCode.FORG0006); + abvs.reset(); + sb.reset(abvs); + while (keysOrMembers.step(tempTvp)) { + sb.addItem(tempTvp); } + sb.finish(); + result.set(abvs); } catch (IOException e) { throw new SystemException(ErrorCode.SYSE0001, e); } finally { http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java new file mode 100644 index 0000000..0e2597c --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnesting.java @@ -0,0 +1,92 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.vxquery.runtime.functions.json; + +import java.io.IOException; + +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.vxquery.datamodel.accessors.ArrayBackedValueStoragePool; +import org.apache.vxquery.datamodel.accessors.PointablePool; +import org.apache.vxquery.datamodel.accessors.SequencePointable; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.datamodel.accessors.jsonitem.ArrayPointable; +import org.apache.vxquery.datamodel.accessors.jsonitem.ObjectPointable; +import org.apache.vxquery.datamodel.values.ValueTag; +import org.apache.vxquery.exceptions.ErrorCode; +import org.apache.vxquery.exceptions.SystemException; +import org.apache.vxquery.runtime.functions.step.AbstractForwardAxisPathStep; + +public class KeysOrMembersUnnesting extends AbstractForwardAxisPathStep { + private final ArrayPointable ap = (ArrayPointable) ArrayPointable.FACTORY.createPointable(); + private final SequencePointable sp = (SequencePointable) SequencePointable.FACTORY.createPointable(); + private final ObjectPointable op = (ObjectPointable) ObjectPointable.FACTORY.createPointable(); + private final TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + private final TaggedValuePointable tempTvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + private TaggedValuePointable arg = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); + private final ArrayBackedValueStoragePool abvsPool = new ArrayBackedValueStoragePool(); + private ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); + private int arOrObArgsLength; + private int indexArrayArgs; + + public KeysOrMembersUnnesting(IHyracksTaskContext ctx, PointablePool pp) { + super(ctx, pp); + } + + protected void init(TaggedValuePointable[] args) throws SystemException { + abvs = abvsPool.takeOne(); + indexArrayArgs = 0; + arg = args[0]; + switch (arg.getTag()) { + case ValueTag.OBJECT_TAG: + arg.getValue(op); + arOrObArgsLength = op.getEntryCount(); + break; + case ValueTag.ARRAY_TAG: + arg.getValue(ap); + arOrObArgsLength = ap.getEntryCount(); + break; + default: + throw new SystemException(ErrorCode.FORG0006); + } + } + + public boolean step(IPointable result) throws SystemException { + abvs = abvsPool.takeOne(); + if (arOrObArgsLength > 0) { + while (indexArrayArgs < arOrObArgsLength) { + if (arg.getTag() == ValueTag.ARRAY_TAG) { + ap.getEntry(indexArrayArgs, tvp); + } else { + try { + op.getKeys(abvs); + } catch (IOException e) { + throw new SystemException(ErrorCode.SYSE0001, e); + } + tempTvp.set(abvs); + tempTvp.getValue(sp); + sp.getEntry(indexArrayArgs, tvp); + } + result.set(tvp.getByteArray(), tvp.getStartOffset(), tvp.getLength()); + indexArrayArgs++; + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/53b86c24/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java new file mode 100644 index 0000000..a4e146c --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/json/KeysOrMembersUnnestingEvaluator.java @@ -0,0 +1,44 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.vxquery.runtime.functions.json; + +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; +import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentUnnestingEvaluator; + +public class KeysOrMembersUnnestingEvaluator extends AbstractTaggedValueArgumentUnnestingEvaluator { + private final KeysOrMembersUnnesting keysOrMembersStep; + + public KeysOrMembersUnnestingEvaluator(IHyracksTaskContext ctx, IScalarEvaluator[] args) { + super(args); + keysOrMembersStep = new KeysOrMembersUnnesting(ctx, ppool); + } + + @Override + public boolean step(IPointable result) throws HyracksDataException { + return keysOrMembersStep.step(result); + } + + @Override + protected void init(TaggedValuePointable[] args) throws HyracksDataException { + keysOrMembersStep.init(args); + + } +}
