http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayAppendDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayAppendDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayAppendDescriptor.java index aae4d02..6d636ed 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayAppendDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayAppendDescriptor.java @@ -37,6 +37,20 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +/** + * <pre> + * array_append(list, val1, val2, ...) returns a new open list with all the values appended to the input list items. + * Values can be null (i.e., one can append nulls) + * + * It throws an error at compile time if the number of arguments < 2 + * + * It returns in order: + * 1. missing, if any argument is missing. + * 2. null, if the list arg is null or it's not a list. + * 3. otherwise, a new open list. + * + * </pre> + */ public class ArrayAppendDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; private IAType[] argTypes; @@ -66,7 +80,7 @@ public class ArrayAppendDescriptor extends AbstractScalarFunctionDynamicDescript @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayAppendFunction(args, ctx); + return new ArrayAppendEval(args, ctx); } }; } @@ -76,10 +90,9 @@ public class ArrayAppendDescriptor extends AbstractScalarFunctionDynamicDescript argTypes = (IAType[]) states; } - public class ArrayAppendFunction extends AbstractArrayAddRemoveEval { + public class ArrayAppendEval extends AbstractArrayAddRemoveEval { - public ArrayAppendFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) - throws HyracksDataException { + public ArrayAppendEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { super(args, ctx, 0, 1, args.length - 1, argTypes, false, sourceLoc, true, true); }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayConcatDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayConcatDescriptor.java new file mode 100755 index 0000000..9af159c --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayConcatDescriptor.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.evaluators.functions; + +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; + +/** + * <pre> + * array_concat(list1, list2, ...) returns a new open list with all the values of all lists appended in order into the + * new list. Items of the lists can be null or missing (both are added as a null value). + * + * It throws an error at compile time if the number of arguments < 2 + * + * It returns (or throws an error at runtime) in order: + * 1. missing, if any argument is missing. + * 2. an error if the input lists are not of the same type (one is an ordered list while the other is unordered). + * 3. null, if any input list is null or is not a list. + * 4. otherwise, a new open list. + * + * </pre> + */ +public class ArrayConcatDescriptor extends AbstractScalarFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + private IAType[] argTypes; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new ArrayConcatDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.SET_ARGUMENTS_TYPE; + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.ARRAY_CONCAT; + } + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) + throws AlgebricksException { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { + return new ArrayConcatEval(args, ctx); + } + }; + } + + @Override + public void setImmutableStates(Object... states) { + argTypes = (IAType[]) states; + } + + public class ArrayConcatEval extends AbstractArrayProcessArraysEval { + public ArrayConcatEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { + super(args, ctx, false, sourceLoc, argTypes); + } + + @Override + protected void init() { + // do nothing + } + + @Override + protected void finish(IAsterixListBuilder listBuilder) { + // do nothing + } + + @Override + protected void release() { + // do nothing + } + + @Override + protected boolean processItem(IPointable item, int listIndex, IAsterixListBuilder listBuilder) + throws HyracksDataException { + listBuilder.addItem(item); + // listBuilder always copies the item, so we're not using the supplied item + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java index cc651f6..16bb0c9 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java @@ -35,6 +35,21 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; +/** + * <pre> + * array_contains(list, val) returns true if the the input list contains the value argument. + * + * It throws an error at compile time if the number of arguments != 2 + * + * It returns (or throws an error at runtime) in order: + * 1. missing, if any argument is missing. + * 2. null, if any argument is null. + * 3. an error if the value is of a list/object type (i.e. derived type) since deep equality is not yet supported. + * 4. null, if the input list is not a list. + * 5. otherwise, returns true or false. + * + * </pre> + */ public class ArrayContainsDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; @@ -58,16 +73,15 @@ public class ArrayContainsDescriptor extends AbstractScalarFunctionDynamicDescri @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayContainsFunction(args, ctx); + return new ArrayContainsEval(args, ctx); } }; } - public class ArrayContainsFunction extends AbstractArraySearchEval { + public class ArrayContainsEval extends AbstractArraySearchEval { private final ISerializerDeserializer booleanSerde; - public ArrayContainsFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) - throws HyracksDataException { + public ArrayContainsEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { super(args, ctx, sourceLoc); // TODO(ali): should we get the nontagged serde? booleanSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java index 52db331..b74ec6a 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java @@ -28,7 +28,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.asterix.builders.IAsterixListBuilder; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory; import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.functions.IFunctionDescriptor; @@ -39,12 +38,12 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; import org.apache.asterix.runtime.evaluators.common.ListAccessor; import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.asterix.runtime.utils.ArrayFunctionsUtil; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; @@ -92,7 +91,6 @@ public class ArrayDistinctDescriptor extends AbstractScalarFunctionDynamicDescri } public class ArrayDistinctFunction extends AbstractArrayProcessEval { - private final IBinaryComparator comp; private final SourceLocation sourceLoc; private final IBinaryHashFunction binaryHashFunction; private final Int2ObjectMap<List<IPointable>> hashes; @@ -106,7 +104,6 @@ public class ArrayDistinctDescriptor extends AbstractScalarFunctionDynamicDescri hashes = new Int2ObjectOpenHashMap<>(); item = pointableAllocator.allocateEmpty(); storage = (ArrayBackedValueStorage) storageAllocator.allocate(null); - comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator(); binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null) .createBinaryHashFunction(); } @@ -141,12 +138,10 @@ public class ArrayDistinctDescriptor extends AbstractScalarFunctionDynamicDescri addItem(item, listBuilder, itemInStorage, sameHashes); hashes.put(hash, sameHashes); item = pointableAllocator.allocateEmpty(); - } else { - // check if it happens that two hashes are the same but they are for different items - if (isNewItem(item, sameHashes)) { - addItem(item, listBuilder, itemInStorage, sameHashes); - item = pointableAllocator.allocateEmpty(); - } + } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) { + // new item, it could happen that two hashes are the same but they are for different items + addItem(item, listBuilder, itemInStorage, sameHashes); + item = pointableAllocator.allocateEmpty(); } } } @@ -166,16 +161,5 @@ public class ArrayDistinctDescriptor extends AbstractScalarFunctionDynamicDescri storage = (ArrayBackedValueStorage) storageAllocator.allocate(null); } } - - private boolean isNewItem(IPointable item, List<IPointable> sameHashes) throws HyracksDataException { - for (int j = 0; j < sameHashes.size(); j++) { - if (comp.compare(item.getByteArray(), item.getStartOffset(), item.getLength(), - sameHashes.get(j).getByteArray(), sameHashes.get(j).getStartOffset(), - sameHashes.get(j).getLength()) == 0) { - return false; - } - } - return true; - } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java new file mode 100644 index 0000000..af55eea --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.evaluators.functions; + +import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER; + +import java.io.IOException; + +import org.apache.asterix.builders.AbvsBuilderFactory; +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.builders.UnorderedListBuilder; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AbstractCollectionType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.asterix.om.util.container.IObjectPool; +import org.apache.asterix.om.util.container.ListObjectPool; +import org.apache.asterix.runtime.base.ListAccessorFactory; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.evaluators.common.ListAccessor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.AbstractPointable; +import org.apache.hyracks.data.std.api.IMutableValueStorage; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.TaggedValuePointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +/** + * <pre> + * array_flatten(list, depth) returns a new open list with any nested list (all types) flattened up to the specified + * depth. The returned list type is the same as the input list type. Null and missing items are preserved. + * If the depth < 0, then it flattens the input list all the way deep. + * + * array_flatten([2, null, [5,6], 3, missing], 1) will result in [2, null, 5, 6, 3, null] + * array_flatten([2, [5,6], 3], 0) will result in [2, [5,6], 3] (0 depth does nothing) + * + * It throws an error at compile time if the number of arguments != 2 + * + * It returns in order: + * 1. missing, if any argument is missing. + * 2. null, if: + * - any argument is null. + * - the input list is not a list. + * - the depth arg is not numeric or + * - it's a floating-point number with decimals (e.g. 1.2 will produce null, 1.0 is OK). + * 3. otherwise, a new open list. + * + * </pre> + */ +public class ArrayFlattenDescriptor extends AbstractScalarFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + private IAType inputListType; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new ArrayFlattenDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.SET_ARGUMENT_TYPE; + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.ARRAY_FLATTEN; + } + + @Override + public void setImmutableStates(Object... states) { + inputListType = (IAType) states[0]; + } + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) + throws AlgebricksException { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { + return new ArrayFlattenEval(args, ctx); + } + }; + } + + public class ArrayFlattenEval implements IScalarEvaluator { + private final IScalarEvaluator listEval; + private final IScalarEvaluator depthEval; + private final IPointable list; + private final AbstractPointable item; + private final TaggedValuePointable depthArg; + private final IObjectPool<IMutableValueStorage, ATypeTag> storageAllocator; + private final IObjectPool<ListAccessor, ATypeTag> listAccessorAllocator; + private final CastTypeEvaluator caster; + private ArrayBackedValueStorage storage; + private IAsterixListBuilder orderedListBuilder; + private IAsterixListBuilder unorderedListBuilder; + + public ArrayFlattenEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { + storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory()); + listAccessorAllocator = new ListObjectPool<>(new ListAccessorFactory()); + storage = new ArrayBackedValueStorage(); + listEval = args[0].createScalarEvaluator(ctx); + depthEval = args[1].createScalarEvaluator(ctx); + list = new VoidPointable(); + item = new VoidPointable(); + caster = new CastTypeEvaluator(); + depthArg = new TaggedValuePointable(); + orderedListBuilder = null; + unorderedListBuilder = null; + } + + @Override + public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + // 1st arg: list to flatten + listEval.evaluate(tuple, list); + // 2nd arg: depthArg + depthEval.evaluate(tuple, depthArg); + + ATypeTag listType = ATYPETAGDESERIALIZER.deserialize(list.getByteArray()[list.getStartOffset()]); + if (!ATypeHierarchy.isCompatible(ATYPETAGDESERIALIZER.deserialize(depthArg.getTag()), ATypeTag.DOUBLE) + || !listType.isListType()) { + PointableHelper.setNull(result); + return; + } + String name = getIdentifier().getName(); + double depth = ATypeHierarchy.getDoubleValue(name, 1, depthArg.getByteArray(), depthArg.getStartOffset()); + if (Double.isNaN(depth) || Double.isInfinite(depth) || Math.floor(depth) < depth) { + PointableHelper.setNull(result); + return; + } + + caster.reset(DefaultOpenFieldType.getDefaultOpenFieldType(listType), inputListType, listEval); + caster.evaluate(tuple, list); + + int depthInt = (int) depth; + // create list + IAsterixListBuilder listBuilder; + if (listType == ATypeTag.ARRAY) { + if (orderedListBuilder == null) { + orderedListBuilder = new OrderedListBuilder(); + } + listBuilder = orderedListBuilder; + } else { + if (unorderedListBuilder == null) { + unorderedListBuilder = new UnorderedListBuilder(); + } + listBuilder = unorderedListBuilder; + } + + ListAccessor mainListAccessor = listAccessorAllocator.allocate(null); + listBuilder.reset((AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listType)); + mainListAccessor.reset(list.getByteArray(), list.getStartOffset()); + try { + process(mainListAccessor, listBuilder, 0, depthInt); + storage.reset(); + listBuilder.write(storage.getDataOutput(), true); + result.set(storage); + } catch (IOException e) { + throw HyracksDataException.create(e); + } finally { + storageAllocator.reset(); + listAccessorAllocator.reset(); + } + } + + private void process(ListAccessor listAccessor, IAsterixListBuilder listBuilder, int currentDepth, int depth) + throws IOException { + boolean itemInStorage; + for (int i = 0; i < listAccessor.size(); i++) { + itemInStorage = listAccessor.getOrWriteItem(i, item, storage); + // if item is not a list or depth is reached, write it + if (!ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isListType() + || currentDepth == depth) { + listBuilder.addItem(item); + } else { + // recurse on the sublist + ListAccessor newListAccessor = listAccessorAllocator.allocate(null); + newListAccessor.reset(item.getByteArray(), item.getStartOffset()); + if (itemInStorage) { + // create a new storage since the item is using it + storage = (ArrayBackedValueStorage) storageAllocator.allocate(null); + storage.reset(); + } + process(newListAccessor, listBuilder, currentDepth + 1, depth); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIfNullDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIfNullDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIfNullDescriptor.java new file mode 100755 index 0000000..53853e2 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIfNullDescriptor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.evaluators.functions; + +import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER; + +import java.io.IOException; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.evaluators.common.ListAccessor; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.AbstractPointable; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +/** + * <pre> + * array_ifnull(list) returns the first item it encounters that is not a null or missing. Otherwise, it returns null. + * + * It throws an error at compile time if the number of arguments != 1 + * + * It returns in order: + * 1. missing if the input list is missing + * 2. null if the input list is null or is not a list. + * 3. otherwise, the first non-null non-missing item in the list. Otherwise, null. + * + * </pre> + */ +public class ArrayIfNullDescriptor extends AbstractScalarFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new ArrayIfNullDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.ARRAY_IFNULL; + } + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) + throws AlgebricksException { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { + return new ArrayIfNullEval(args, ctx); + } + }; + } + + public class ArrayIfNullEval implements IScalarEvaluator { + private final ArrayBackedValueStorage storage; + private final IScalarEvaluator listArgEval; + private final IPointable listArg; + private final ListAccessor listAccessor; + private final AbstractPointable item; + + public ArrayIfNullEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { + storage = new ArrayBackedValueStorage(); + listArg = new VoidPointable(); + item = new VoidPointable(); + listAccessor = new ListAccessor(); + listArgEval = args[0].createScalarEvaluator(ctx); + } + + @Override + public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + // get the list argument and make sure it's a list + listArgEval.evaluate(tuple, listArg); + byte[] listBytes = listArg.getByteArray(); + int offset = listArg.getStartOffset(); + ATypeTag listType = ATYPETAGDESERIALIZER.deserialize(listBytes[offset]); + if (!listType.isListType()) { + PointableHelper.setNull(result); + return; + } + + listAccessor.reset(listBytes, offset); + ATypeTag itemTypeTag = listAccessor.getItemType(); + try { + if (itemTypeTag == ATypeTag.NULL || itemTypeTag == ATypeTag.MISSING) { + // list of nulls or list of missings + PointableHelper.setNull(result); + return; + } + + int numItems = listAccessor.size(); + for (int i = 0; i < numItems; i++) { + listAccessor.getOrWriteItem(i, item, storage); + itemTypeTag = ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]); + if (itemTypeTag != ATypeTag.NULL && itemTypeTag != ATypeTag.MISSING) { + result.set(item); + return; + } + } + PointableHelper.setNull(result); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayInsertDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayInsertDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayInsertDescriptor.java index 3d3c54c..634f114 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayInsertDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayInsertDescriptor.java @@ -41,6 +41,25 @@ import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.TaggedValuePointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +/** + * <pre> + * array_insert(list, pos, val1, val2, ...) returns a new open list with all values inserted at the specified position. + * Values can be null (i.e., one can insert nulls). Position can be negative where the last position = -1. When position + * is positive then the first position = 0. Input list can be empty where the only valid position is 0. + * For the list [5,6], the valid positions are 0, 1, 2, -1, -2. If position is floating-point, it's casted to integer. + * TODO: should decide on what to do for floating-point positions. + * + * It throws an error at compile time if the number of arguments < 3 + * + * It returns in order: + * 1. missing, if any argument is missing. + * 2. null, if + * - the list arg is null or it's not a list + * - the position is not numeric or the position is out of bound. + * 3. otherwise, a new open list. + * + * </pre> + */ public class ArrayInsertDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; private IAType[] argTypes; @@ -70,7 +89,7 @@ public class ArrayInsertDescriptor extends AbstractScalarFunctionDynamicDescript @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayInsertFunction(args, ctx); + return new ArrayInsertEval(args, ctx); } }; } @@ -80,12 +99,11 @@ public class ArrayInsertDescriptor extends AbstractScalarFunctionDynamicDescript argTypes = (IAType[]) states; } - public class ArrayInsertFunction extends AbstractArrayAddRemoveEval { + public class ArrayInsertEval extends AbstractArrayAddRemoveEval { private final TaggedValuePointable positionArg; private final IScalarEvaluator positionArgEval; - public ArrayInsertFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) - throws HyracksDataException { + public ArrayInsertEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { super(args, ctx, 0, 2, args.length - 2, argTypes, false, sourceLoc, true, true); positionArg = new TaggedValuePointable(); positionArgEval = args[1].createScalarEvaluator(ctx); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java new file mode 100755 index 0000000..f84dee7 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.evaluators.functions; + +import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.asterix.builders.AbvsBuilderFactory; +import org.apache.asterix.builders.ArrayListFactory; +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.builders.UnorderedListBuilder; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer; +import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.pointables.PointableAllocator; +import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AbstractCollectionType; +import org.apache.asterix.om.util.container.IObjectPool; +import org.apache.asterix.om.util.container.ListObjectPool; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.evaluators.common.ListAccessor; +import org.apache.asterix.runtime.utils.ArrayFunctionsUtil; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IMutableValueStorage; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new ArrayIntersectDescriptor(); + } + }; + + public class ValueListIndex implements IValueReference { + private final IPointable value; + private int listIndex; + + public ValueListIndex(IPointable value, int listIndex) { + this.value = value; + this.listIndex = listIndex; + } + + @Override + public byte[] getByteArray() { + return value.getByteArray(); + } + + @Override + public int getStartOffset() { + return value.getStartOffset(); + } + + @Override + public int getLength() { + return value.getLength(); + } + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.ARRAY_INTERSECT; + } + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) + throws AlgebricksException { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { + return new ArrayIntersectEval(args, ctx); + } + }; + } + + public class ArrayIntersectEval implements IScalarEvaluator { + private final ListAccessor listAccessor; + private final IPointable[] listsArgs; + private final IScalarEvaluator[] listsEval; + private final IBinaryHashFunction binaryHashFunction; + private final Int2ObjectMap<List<ValueListIndex>> hashes; + private final PointableAllocator pointableAllocator; + private final IObjectPool<IMutableValueStorage, ATypeTag> storageAllocator; + private final IObjectPool<List<ValueListIndex>, ATypeTag> arrayListAllocator; + private final ArrayBackedValueStorage finalResult; + private IAsterixListBuilder orderedListBuilder; + private IAsterixListBuilder unorderedListBuilder; + + public ArrayIntersectEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { + orderedListBuilder = null; + unorderedListBuilder = null; + pointableAllocator = new PointableAllocator(); + storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory()); + arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>()); + hashes = new Int2ObjectOpenHashMap<>(); + finalResult = new ArrayBackedValueStorage(); + listAccessor = new ListAccessor(); + listsArgs = new IPointable[args.length]; + listsEval = new IScalarEvaluator[args.length]; + for (int i = 0; i < args.length; i++) { + listsArgs[i] = new VoidPointable(); + listsEval[i] = args[i].createScalarEvaluator(ctx); + } + binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null) + .createBinaryHashFunction(); + } + + @Override + public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + byte listArgType; + boolean returnNull = false; + AbstractCollectionType outList = null; + ATypeTag listTag; + int minListIndex = 0; + int minSize = -1; + int nextSize; + IScalarEvaluator listEval; + IPointable listArg; + // evaluate all the lists first to make sure they're all actually lists and of the same list type + for (int i = 0; i < listsEval.length; i++) { + listEval = listsEval[i]; + listEval.evaluate(tuple, listsArgs[i]); + if (!returnNull) { + listArg = listsArgs[i]; + listArgType = listArg.getByteArray()[listArg.getStartOffset()]; + listTag = ATYPETAGDESERIALIZER.deserialize(listArgType); + if (!listTag.isListType()) { + returnNull = true; + } else if (outList != null && outList.getTypeTag() != listTag) { + throw new RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc); + } else { + if (outList == null) { + outList = (AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listTag); + } + + nextSize = getNumItems(outList, listArg.getByteArray(), listArg.getStartOffset()); + if (nextSize < minSize) { + minSize = nextSize; + minListIndex = i; + } + } + } + } + + if (returnNull) { + PointableHelper.setNull(result); + return; + } + + IAsterixListBuilder listBuilder; + if (outList.getTypeTag() == ATypeTag.ARRAY) { + if (orderedListBuilder == null) { + orderedListBuilder = new OrderedListBuilder(); + } + listBuilder = orderedListBuilder; + } else { + if (unorderedListBuilder == null) { + unorderedListBuilder = new UnorderedListBuilder(); + } + listBuilder = unorderedListBuilder; + } + + hashes.clear(); + try { + // first, get distinct items of the most restrictive (smallest) list, pass listBuilder as null since + // we're not adding values yet. Values will be added to listBuilder after inspecting all input lists + listArg = listsArgs[minListIndex]; + listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset()); + processList(listAccessor, minListIndex, null, true); + + // now process each list one by one + listBuilder.reset(outList); + for (int listIndex = 0; listIndex < listsArgs.length; listIndex++) { + if (listIndex == minListIndex) { + incrementSmallest(listIndex, hashes.values()); + } else { + listArg = listsArgs[listIndex]; + listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset()); + processList(listAccessor, listIndex, listBuilder, false); + } + } + + finalResult.reset(); + listBuilder.write(finalResult.getDataOutput(), true); + result.set(finalResult); + } catch (IOException e) { + throw HyracksDataException.create(e); + } finally { + storageAllocator.reset(); + arrayListAllocator.reset(); + pointableAllocator.reset(); + } + } + + private int getNumItems(AbstractCollectionType listType, byte[] listBytes, int offset) { + if (listType.getTypeTag() == ATypeTag.ARRAY) { + return AOrderedListSerializerDeserializer.getNumberOfItems(listBytes, offset); + } else { + return AUnorderedListSerializerDeserializer.getNumberOfItems(listBytes, offset); + } + } + + private void processList(ListAccessor listAccessor, int listIndex, IAsterixListBuilder listBuilder, + boolean initIntersectList) throws IOException { + int hash; + List<ValueListIndex> sameHashes; + boolean itemInStorage; + IPointable item = pointableAllocator.allocateEmpty(); + ArrayBackedValueStorage storage = (ArrayBackedValueStorage) storageAllocator.allocate(null); + storage.reset(); + for (int j = 0; j < listAccessor.size(); j++) { + itemInStorage = listAccessor.getOrWriteItem(j, item, storage); + if (ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType()) { + throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLoc); + } + if (notNullAndMissing(item)) { + // look up to see if item exists + hash = binaryHashFunction.hash(item.getByteArray(), item.getStartOffset(), item.getLength()); + sameHashes = hashes.get(hash); + if (initIntersectList && initIntersectList(item, hash, sameHashes)) { + // item is used + item = pointableAllocator.allocateEmpty(); + if (itemInStorage) { + storage = (ArrayBackedValueStorage) storageAllocator.allocate(null); + storage.reset(); + } + } else { + incrementCommonValue(item, sameHashes, listIndex, listBuilder); + } + } + } + } + + // collect the items of the most restrictive list, it initializes the list index as -1. each successive list + // should stamp the value with its list index if the list has the item. It starts with list index = 0 + private boolean initIntersectList(IPointable item, int hash, List<ValueListIndex> sameHashes) + throws IOException { + // add if new item + if (sameHashes == null) { + List<ValueListIndex> newHashes = arrayListAllocator.allocate(null); + newHashes.clear(); + newHashes.add(new ValueListIndex(item, -1)); + hashes.put(hash, newHashes); + return true; + } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) { + sameHashes.add(new ValueListIndex(item, -1)); + return true; + } + // else ignore for duplicate values in the same list + return false; + } + + private void incrementCommonValue(IPointable item, List<ValueListIndex> sameHashes, int listIndex, + IAsterixListBuilder listBuilder) throws IOException { + if (sameHashes != null) { + // look for the same equal item, add to list builder when all lists have seen this item + incrementIfExists(sameHashes, item, listIndex, listBuilder); + } + } + + private boolean notNullAndMissing(IPointable item) { + byte tag = item.getByteArray()[item.getStartOffset()]; + return tag != ATypeTag.SERIALIZED_NULL_TYPE_TAG && tag != ATypeTag.SERIALIZED_MISSING_TYPE_TAG; + } + + // this method is only for the most restrictive list. it avoids comparison since it is the initial list we start + // with, so for sure every element in the collection must exist in the list + private void incrementSmallest(int listIndex, Collection<List<ValueListIndex>> commonValues) { + for (List<ValueListIndex> items : commonValues) { + for (int i = 0; i < items.size(); i++) { + // any difference that is not == 1 means either this current list has already stamped and advanced + // the stamp or the item is not common among lists because if it's common then each list should've + // incremented the item list index up to the current list index + if (listIndex - items.get(i).listIndex == 1) { + items.get(i).listIndex = listIndex; + } + } + } + } + + private void incrementIfExists(List<ValueListIndex> sameHashes, IPointable item, int listIndex, + IAsterixListBuilder listBuilder) throws HyracksDataException { + ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, sameHashes); + if (sameValue != null && listIndex - sameValue.listIndex == 1) { + // found the item, its stamp is OK (stamp saves the last list index that has seen this item) + // increment stamp of this item + sameValue.listIndex = listIndex; + if (listIndex == listsArgs.length - 1) { + // when listIndex is the last list, then it means this item was found in all previous lists + listBuilder.addItem(item); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java index 5258abd..e4e54f1 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java @@ -34,6 +34,22 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; +/** + * <pre> + * array_position(list, val) returns the 0-based position (as integer) of the value argument in the input list. If the + * value does not exists, it returns -1 + * + * It throws an error at compile time if the number of arguments != 2 + * + * It returns (or throws an error at runtime) in order: + * 1. missing, if any argument is missing. + * 2. null, if any argument is null. + * 3. an error if the value is of a list/object type (i.e. derived type) since deep equality is not yet supported. + * 4. null, if the input list is not a list. + * 5. otherwise, returns the position of the value in the list or -1 if not found. + * + * </pre> + */ public class ArrayPositionDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; @@ -57,16 +73,15 @@ public class ArrayPositionDescriptor extends AbstractScalarFunctionDynamicDescri @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayPositionFunction(args, ctx); + return new ArrayPositionEval(args, ctx); } }; } - public class ArrayPositionFunction extends AbstractArraySearchEval { + public class ArrayPositionEval extends AbstractArraySearchEval { private final ISerializerDeserializer intSerde; - public ArrayPositionFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) - throws HyracksDataException { + public ArrayPositionEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { super(args, ctx, sourceLoc); intSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPrependDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPrependDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPrependDescriptor.java index 7e52df5..dcb6ad8 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPrependDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPrependDescriptor.java @@ -35,6 +35,20 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +/** + * <pre> + * array_prepend(val1, val2, ..., list) returns a new open list with all the values prepended to the input list items. + * Values can be null (i.e., one can append nulls) + * + * It throws an error at compile time if the number of arguments < 2 + * + * It returns in order: + * 1. missing, if any argument is missing. + * 2. null, if the list arg is null or it's not a list. + * 3. otherwise, a new open list. + * + * </pre> + */ public class ArrayPrependDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; private IAType[] argTypes; @@ -64,7 +78,7 @@ public class ArrayPrependDescriptor extends AbstractScalarFunctionDynamicDescrip @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayPrependFunction(args, ctx); + return new ArrayPrependEval(args, ctx); } }; } @@ -74,10 +88,9 @@ public class ArrayPrependDescriptor extends AbstractScalarFunctionDynamicDescrip argTypes = (IAType[]) states; } - public class ArrayPrependFunction extends AbstractArrayAddRemoveEval { + public class ArrayPrependEval extends AbstractArrayAddRemoveEval { - public ArrayPrependFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) - throws HyracksDataException { + public ArrayPrependEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { super(args, ctx, args.length - 1, 0, args.length - 1, argTypes, false, sourceLoc, true, true); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java index 6435369..fc17e99 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java @@ -44,6 +44,22 @@ import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +/** + * <pre> + * array_put(list, val1, val2, ...) returns a new open list with all the values appended to the input list items only if + * the list does not already have the value. Values cannot be null (i.e., one cannot append nulls). + * array_put([2, 3], 2, 2, 9, 9) will result in [2, 3, 9, 9]. + * + * It throws an error at compile time if the number of arguments < 2 + * + * It returns (or throws an error at runtime) in order: + * 1. missing, if any argument is missing. + * 2. null, if any argument is null. + * 3. an error if any value arg is of a list/object type (i.e. derived type) since deep equality is not yet supported. + * 4. otherwise, a new open list. + * + * </pre> + */ public class ArrayPutDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; private IAType[] argTypes; @@ -73,7 +89,7 @@ public class ArrayPutDescriptor extends AbstractScalarFunctionDynamicDescriptor @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayPutFunction(args, ctx); + return new ArrayPutEval(args, ctx); } }; } @@ -83,11 +99,11 @@ public class ArrayPutDescriptor extends AbstractScalarFunctionDynamicDescriptor argTypes = (IAType[]) states; } - public class ArrayPutFunction extends AbstractArrayAddRemoveEval { + public class ArrayPutEval extends AbstractArrayAddRemoveEval { private final ArrayBackedValueStorage storage; private final IBinaryComparator comp; - public ArrayPutFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { + public ArrayPutEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { super(args, ctx, 0, 1, args.length - 1, argTypes, true, sourceLoc, true, false); comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator(); storage = new ArrayBackedValueStorage(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRangeDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRangeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRangeDescriptor.java new file mode 100755 index 0000000..4068101 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRangeDescriptor.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.evaluators.functions; + +import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER; + +import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.AMutableDouble; +import org.apache.asterix.om.base.AMutableInt64; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.typecomputer.impl.ArrayRangeTypeComputer; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.TaggedValuePointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class ArrayRangeDescriptor extends AbstractScalarFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new ArrayRangeDescriptor(); + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.ARRAY_RANGE; + } + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) + throws AlgebricksException { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { + return new ArrayRangeEval(args, ctx); + } + }; + } + + public class ArrayRangeEval implements IScalarEvaluator { + private final OrderedListBuilder listBuilder; + private final ArrayBackedValueStorage storage; + private final IScalarEvaluator startNumEval; + private final TaggedValuePointable start; + private final IScalarEvaluator endNumEval; + private final TaggedValuePointable end; + private final AMutableDouble aDouble; + private final AMutableInt64 aLong; + private IScalarEvaluator stepNumEval; + private TaggedValuePointable step; + + public ArrayRangeEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { + storage = new ArrayBackedValueStorage(); + start = new TaggedValuePointable(); + end = new TaggedValuePointable(); + startNumEval = args[0].createScalarEvaluator(ctx); + endNumEval = args[1].createScalarEvaluator(ctx); + listBuilder = new OrderedListBuilder(); + aDouble = new AMutableDouble(0); + aLong = new AMutableInt64(0); + if (args.length == 3) { + stepNumEval = args[2].createScalarEvaluator(ctx); + step = new TaggedValuePointable(); + } + } + + @Override + public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + startNumEval.evaluate(tuple, start); + endNumEval.evaluate(tuple, end); + String n = getIdentifier().getName(); + ATypeTag startTag = ATYPETAGDESERIALIZER.deserialize(start.getTag()); + ATypeTag endTag = ATYPETAGDESERIALIZER.deserialize(end.getTag()); + ATypeTag stepTag = ATypeTag.INTEGER; + double stepNum = 1; + if (stepNumEval != null) { + stepNumEval.evaluate(tuple, step); + stepTag = ATYPETAGDESERIALIZER.deserialize(step.getTag()); + if (!ATypeHierarchy.isCompatible(ATypeTag.DOUBLE, stepTag)) { + PointableHelper.setNull(result); + return; + } + stepNum = ATypeHierarchy.getDoubleValue(n, 2, step.getByteArray(), step.getStartOffset()); + } + + if (!ATypeHierarchy.isCompatible(ATypeTag.DOUBLE, startTag) + || !ATypeHierarchy.isCompatible(ATypeTag.DOUBLE, endTag)) { + PointableHelper.setNull(result); + return; + } + + ISerializerDeserializer serde; + if (ATypeHierarchy.canPromote(startTag, ATypeTag.BIGINT) + && ATypeHierarchy.canPromote(endTag, ATypeTag.BIGINT) + && ATypeHierarchy.canPromote(stepTag, ATypeTag.BIGINT)) { + // all 3 numbers are whole numbers + serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); + long startNum = ATypeHierarchy.getLongValue(n, 0, start.getByteArray(), start.getStartOffset()); + long endNum = ATypeHierarchy.getLongValue(n, 1, end.getByteArray(), end.getStartOffset()); + listBuilder.reset(ArrayRangeTypeComputer.LONG_LIST); + while ((startNum < endNum && stepNum > 0) || (startNum > endNum && stepNum < 0)) { + aLong.setValue(startNum); + storage.reset(); + serde.serialize(aLong, storage.getDataOutput()); + listBuilder.addItem(storage); + startNum += stepNum; + } + } else { + // one number is a floating-point number + serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE); + double startNum = ATypeHierarchy.getDoubleValue(n, 0, start.getByteArray(), start.getStartOffset()); + double endNum = ATypeHierarchy.getDoubleValue(n, 1, end.getByteArray(), end.getStartOffset()); + listBuilder.reset(ArrayRangeTypeComputer.DOUBLE_LIST); + while ((startNum < endNum && stepNum > 0) || (startNum > endNum && stepNum < 0)) { + aDouble.setValue(startNum); + storage.reset(); + serde.serialize(aDouble, storage.getDataOutput()); + listBuilder.addItem(storage); + startNum += stepNum; + } + } + + storage.reset(); + listBuilder.write(storage.getDataOutput(), true); + result.set(storage); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java index ff7ace1..ea27017 100755 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java @@ -42,6 +42,21 @@ import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +/** + * <pre> + * array_remove(list, val1, val2, ...) returns a new (open or closed) list with all the values removed from the input + * list. Values cannot be null (i.e., one cannot remove nulls). + * + * It throws an error at compile time if the number of arguments < 2 + * + * It returns (or throws an error at runtime) in order: + * 1. missing, if any argument is missing. + * 2. null, if any argument is null. + * 4. an error if any value arg is of a list/object type (i.e. derived type) since deep equality is not yet supported. + * 3. otherwise, a new list that has the same type as the input list. + * + * </pre> + */ public class ArrayRemoveDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; private IAType[] argTypes; @@ -71,7 +86,7 @@ public class ArrayRemoveDescriptor extends AbstractScalarFunctionDynamicDescript @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayRemoveFunction(args, ctx); + return new ArrayRemoveEval(args, ctx); } }; } @@ -81,12 +96,11 @@ public class ArrayRemoveDescriptor extends AbstractScalarFunctionDynamicDescript argTypes = (IAType[]) states; } - public class ArrayRemoveFunction extends AbstractArrayAddRemoveEval { + public class ArrayRemoveEval extends AbstractArrayAddRemoveEval { private final ArrayBackedValueStorage storage; private final IBinaryComparator comp; - public ArrayRemoveFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) - throws HyracksDataException { + public ArrayRemoveEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { super(args, ctx, 0, 1, args.length - 1, argTypes, true, sourceLoc, false, false); storage = new ArrayBackedValueStorage(); comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRepeatDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRepeatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRepeatDescriptor.java index 3a0280d..ce9d3cc 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRepeatDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRepeatDescriptor.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.runtime.evaluators.functions; -import static org.apache.asterix.om.types.AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE; - import org.apache.asterix.builders.IAsterixListBuilder; import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.om.functions.BuiltinFunctions; @@ -80,12 +78,12 @@ public class ArrayRepeatDescriptor extends AbstractScalarFunctionDynamicDescript @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new ArrayRepeatFunction(args, ctx); + return new ArrayRepeatEval(args, ctx); } }; } - public class ArrayRepeatFunction implements IScalarEvaluator { + public class ArrayRepeatEval implements IScalarEvaluator { private final ArrayBackedValueStorage storage; private final IScalarEvaluator repeatedValueEval; private final IScalarEvaluator repeatEval; @@ -94,8 +92,7 @@ public class ArrayRepeatDescriptor extends AbstractScalarFunctionDynamicDescript private final TaggedValuePointable repeatArgValue; private final IAsterixListBuilder listBuilder; - public ArrayRepeatFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) - throws HyracksDataException { + public ArrayRepeatEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { storage = new ArrayBackedValueStorage(); repeatedValueEval = args[0].createScalarEvaluator(ctx); repeatEval = args[1].createScalarEvaluator(ctx); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9a987680/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java new file mode 100644 index 0000000..e8d77a8 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.evaluators.functions; + +import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER; + +import java.io.IOException; + +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.builders.UnorderedListBuilder; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory; +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.functions.IFunctionTypeInferer; +import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AbstractCollectionType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.evaluators.common.ListAccessor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.AbstractPointable; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.TaggedValuePointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class ArrayReplaceDescriptor extends AbstractScalarFunctionDynamicDescriptor { + private static final long serialVersionUID = 1L; + private IAType inputListType; + private IAType newValueType; + + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { + @Override + public IFunctionDescriptor createFunctionDescriptor() { + return new ArrayReplaceDescriptor(); + } + + @Override + public IFunctionTypeInferer createFunctionTypeInferer() { + return FunctionTypeInferers.SET_ARGUMENTS_TYPE; + } + }; + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.ARRAY_REPLACE; + } + + @Override + public void setImmutableStates(Object... states) { + inputListType = (IAType) states[0]; + newValueType = (IAType) states[2]; + } + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) + throws AlgebricksException { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { + return new ArrayReplaceEval(args, ctx); + } + }; + } + + public class ArrayReplaceEval implements IScalarEvaluator { + private final IScalarEvaluator listEval; + private final IScalarEvaluator targetValEval; + private final IScalarEvaluator newValEval; + private IScalarEvaluator maxEval; + private final IPointable list; + private final IPointable target; + private final IPointable newVal; + private TaggedValuePointable maxArg; + private final AbstractPointable item; + private final ListAccessor listAccessor; + private final IBinaryComparator comp; + private final ArrayBackedValueStorage storage; + private final CastTypeEvaluator caster; + private IAsterixListBuilder orderedListBuilder; + private IAsterixListBuilder unorderedListBuilder; + + public ArrayReplaceEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException { + storage = new ArrayBackedValueStorage(); + listEval = args[0].createScalarEvaluator(ctx); + targetValEval = args[1].createScalarEvaluator(ctx); + newValEval = args[2].createScalarEvaluator(ctx); + if (args.length == 4) { + maxEval = args[3].createScalarEvaluator(ctx); + maxArg = new TaggedValuePointable(); + } + list = new VoidPointable(); + target = new VoidPointable(); + newVal = new VoidPointable(); + item = new VoidPointable(); + listAccessor = new ListAccessor(); + caster = new CastTypeEvaluator(); + orderedListBuilder = null; + unorderedListBuilder = null; + comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator(); + } + + @Override + public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + storage.reset(); + listEval.evaluate(tuple, list); + targetValEval.evaluate(tuple, target); + newValEval.evaluate(tuple, newVal); + ATypeTag listType = ATYPETAGDESERIALIZER.deserialize(list.getByteArray()[list.getStartOffset()]); + ATypeTag targetTag = ATYPETAGDESERIALIZER.deserialize(target.getByteArray()[target.getStartOffset()]); + ATypeTag newValTag = ATYPETAGDESERIALIZER.deserialize(newVal.getByteArray()[newVal.getStartOffset()]); + if (listType == ATypeTag.MISSING || targetTag == ATypeTag.MISSING || newValTag == ATypeTag.MISSING) { + PointableHelper.setMissing(result); + return; + } + + double maxDouble = -1; + String name = getIdentifier().getName(); + if (maxEval != null) { + maxEval.evaluate(tuple, maxArg); + ATypeTag maxTag = ATYPETAGDESERIALIZER.deserialize(maxArg.getTag()); + if (maxTag == ATypeTag.MISSING) { + PointableHelper.setMissing(result); + return; + } else if (!ATypeHierarchy.isCompatible(maxTag, ATypeTag.DOUBLE)) { + PointableHelper.setNull(result); + return; + } + maxDouble = ATypeHierarchy.getDoubleValue(name, 3, maxArg.getByteArray(), maxArg.getStartOffset()); + } + + if (!listType.isListType() || Math.floor(maxDouble) < maxDouble || targetTag == ATypeTag.NULL + || Double.isInfinite(maxDouble) || Double.isNaN(maxDouble)) { + PointableHelper.setNull(result); + return; + } + + if (targetTag.isDerivedType()) { + throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLoc); + } + + IAType defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(listType); + caster.reset(defaultOpenType, inputListType, listEval); + caster.evaluate(tuple, list); + + defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(newValTag); + if (defaultOpenType != null) { + caster.reset(defaultOpenType, newValueType, newValEval); + caster.evaluate(tuple, newVal); + } + + int max = (int) maxDouble; + // create list + IAsterixListBuilder listBuilder; + if (listType == ATypeTag.ARRAY) { + if (orderedListBuilder == null) { + orderedListBuilder = new OrderedListBuilder(); + } + listBuilder = orderedListBuilder; + } else { + if (unorderedListBuilder == null) { + unorderedListBuilder = new UnorderedListBuilder(); + } + listBuilder = unorderedListBuilder; + } + + listBuilder.reset((AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listType)); + listAccessor.reset(list.getByteArray(), list.getStartOffset()); + try { + int counter = 0; + byte[] targetBytes = target.getByteArray(); + int offset = target.getStartOffset(); + int length = target.getLength(); + for (int i = 0; i < listAccessor.size(); i++) { + listAccessor.getOrWriteItem(i, item, storage); + if (counter != max && comp.compare(item.getByteArray(), item.getStartOffset(), item.getLength(), + targetBytes, offset, length) == 0) { + listBuilder.addItem(newVal); + counter++; + } else { + listBuilder.addItem(item); + } + } + storage.reset(); + listBuilder.write(storage.getDataOutput(), true); + result.set(storage); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + } +}