Merged brickhouse functions #135
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/1e1b77ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/1e1b77ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/1e1b77ea Branch: refs/heads/master Commit: 1e1b77ea4724c48f56dd1f3aa15027506558dee1 Parents: eac4800 Author: Makoto Yui <m...@apache.org> Authored: Mon Apr 9 16:04:37 2018 +0900 Committer: Makoto Yui <m...@apache.org> Committed: Mon Apr 9 16:04:37 2018 +0900 ---------------------------------------------------------------------- NOTICE | 4 +- core/pom.xml | 5 + .../java/hivemall/common/OnlineVariance.java | 77 -- .../hivemall/regression/AROWRegressionUDTF.java | 2 +- .../PassiveAggressiveRegressionUDTF.java | 2 +- .../java/hivemall/sketch/bloom/BloomAndUDF.java | 62 ++ .../hivemall/sketch/bloom/BloomContainsUDF.java | 71 ++ .../hivemall/sketch/bloom/BloomFilterUtils.java | 147 ++++ .../java/hivemall/sketch/bloom/BloomNotUDF.java | 59 ++ .../java/hivemall/sketch/bloom/BloomOrUDF.java | 62 ++ .../java/hivemall/sketch/bloom/BloomUDAF.java | 101 +++ .../hivemall/statistics/MovingAverageUDTF.java | 84 +++ .../main/java/hivemall/tools/TryCastUDF.java | 82 +++ .../hivemall/tools/array/ArrayAppendUDF.java | 103 +++ .../hivemall/tools/array/ArrayElementAtUDF.java | 80 +++ .../hivemall/tools/array/ArrayFlattenUDF.java | 111 +++ .../hivemall/tools/array/ArraySliceUDF.java | 141 ++++ .../hivemall/tools/array/ArrayUnionUDF.java | 112 +++ .../tools/array/ConditionalEmitUDTF.java | 128 ++++ .../hivemall/tools/array/FirstElementUDF.java | 68 ++ .../hivemall/tools/array/LastElementUDF.java | 70 ++ .../java/hivemall/tools/array/SubarrayUDF.java | 48 -- .../java/hivemall/tools/json/FromJsonUDF.java | 148 ++++ .../java/hivemall/tools/json/ToJsonUDF.java | 94 +++ .../java/hivemall/tools/sanity/AssertUDF.java | 46 ++ .../hivemall/tools/sanity/RaiseErrorUDF.java | 38 + .../hivemall/tools/vector/VectorAddUDF.java | 139 ++++ .../hivemall/tools/vector/VectorDotUDF.java | 178 +++++ .../utils/collections/DoubleRingBuffer.java | 4 + .../java/hivemall/utils/hadoop/HiveUtils.java | 80 ++- .../hivemall/utils/hadoop/JsonSerdeUtils.java | 715 +++++++++++++++++++ .../hivemall/utils/hadoop/WritableUtils.java | 27 + .../utils/hashing/HashFunctionFactory.java | 1 - .../java/hivemall/utils/lang/StringUtils.java | 16 +- .../hivemall/utils/stats/MovingAverage.java | 74 ++ .../hivemall/utils/stats/OnlineVariance.java | 77 ++ .../hivemall/common/OnlineVarianceTest.java | 89 --- .../hivemall/sketch/bloom/BloomAndUDFTest.java | 89 +++ .../sketch/bloom/BloomContainsUDFTest.java | 71 ++ .../sketch/bloom/BloomFilterUtilsTest.java | 78 ++ .../hivemall/sketch/bloom/BloomNotUDFTest.java | 67 ++ .../hivemall/sketch/bloom/BloomOrUDFTest.java | 89 +++ .../statistics/MovingAverageUDTFTest.java | 68 ++ .../java/hivemall/tools/TryCastUDFTest.java | 59 ++ .../tools/array/ArrayAppendUDFTest.java | 106 +++ .../tools/array/ArrayElementAtUDFTest.java | 86 +++ .../tools/array/ArrayFlattenUDFTest.java | 56 ++ .../hivemall/tools/array/ArraySliceUDFTest.java | 119 +++ .../hivemall/tools/array/ArrayUnionUDFTest.java | 65 ++ .../tools/array/ConditionalEmitUDTFTest.java | 70 ++ .../tools/array/FirstElementUDFTest.java | 66 ++ .../tools/array/LastElementUDFTest.java | 66 ++ .../hivemall/tools/json/FromJsonUDFTest.java | 82 +++ .../java/hivemall/tools/json/ToJsonUDFTest.java | 52 ++ .../hivemall/tools/sanity/AssertUDFTest.java | 39 + .../tools/sanity/RaiseErrorUDFTest.java | 32 + .../hivemall/tools/vector/VectorAddUDFTest.java | 85 +++ .../hivemall/tools/vector/VectorDotUDFTest.java | 83 +++ .../utils/collections/DoubleRingBufferTest.java | 24 + .../utils/hadoop/JsonSerdeUtilsTest.java | 365 ++++++++++ .../hivemall/utils/stats/MovingAverageTest.java | 53 ++ .../utils/stats/OnlineVarianceTest.java | 91 +++ pom.xml | 6 + resources/ddl/define-all-as-permanent.hive | 6 +- resources/ddl/define-all.hive | 6 +- resources/ddl/define-all.spark | 5 +- resources/ddl/define-udfs.td.hql | 3 +- .../org/apache/spark/sql/hive/HivemallOps.scala | 20 +- 68 files changed, 5201 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index 34b5f5d..385a198 100644 --- a/NOTICE +++ b/NOTICE @@ -4,10 +4,10 @@ Copyright 2016-2018 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). -This product is based on source code originally developed by AIST and Treasure Data, Inc. They have been licensed to the Apache Software Foundation under Software Grant Agreements from -the following individuals and organizations: +the following organizations and individuals: - Copyright 2013-2015 National Institute of Advanced Industrial Science and Technology (AIST) - Copyright 2015-2016 Makoto Yui - Copyright 2015-2016 Treasure Data, Inc. + - Copyright 2012 Klout, Inc. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 82cb369..26dca48 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -52,6 +52,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/common/OnlineVariance.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/common/OnlineVariance.java b/core/src/main/java/hivemall/common/OnlineVariance.java deleted file mode 100644 index 6e1d990..0000000 --- a/core/src/main/java/hivemall/common/OnlineVariance.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package hivemall.common; - -/** - * @see http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - */ -public final class OnlineVariance { - - private long n; - private double mean; - private double m2; - - public OnlineVariance() { - reset(); - } - - public void reset() { - this.n = 0L; - this.mean = 0.d; - this.m2 = 0.d; - } - - public void handle(double x) { - ++n; - double delta = x - mean; - mean += delta / n; - m2 += delta * (x - mean); - } - - public void unhandle(double x) { - if (n == 0L) { - return; // nop - } - if (n == 1L) { - reset(); - return; - } - double old_mean = (n * mean - x) / (n - 1L); - m2 -= (x - mean) * (x - old_mean); - mean = old_mean; - --n; - } - - public long numSamples() { - return n; - } - - public double mean() { - return mean; - } - - public double variance() { - return n > 1 ? (m2 / (n - 1)) : 0.d; - } - - public double stddev() { - return Math.sqrt(variance()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/regression/AROWRegressionUDTF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/regression/AROWRegressionUDTF.java b/core/src/main/java/hivemall/regression/AROWRegressionUDTF.java index 3c40c8f..9a0978d 100644 --- a/core/src/main/java/hivemall/regression/AROWRegressionUDTF.java +++ b/core/src/main/java/hivemall/regression/AROWRegressionUDTF.java @@ -18,12 +18,12 @@ */ package hivemall.regression; -import hivemall.common.OnlineVariance; import hivemall.model.FeatureValue; import hivemall.model.IWeightValue; import hivemall.model.PredictionResult; import hivemall.model.WeightValue.WeightValueWithCovar; import hivemall.optimizer.LossFunctions; +import hivemall.utils.stats.OnlineVariance; import javax.annotation.Nonnull; http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/regression/PassiveAggressiveRegressionUDTF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/regression/PassiveAggressiveRegressionUDTF.java b/core/src/main/java/hivemall/regression/PassiveAggressiveRegressionUDTF.java index 946a671..f822fd5 100644 --- a/core/src/main/java/hivemall/regression/PassiveAggressiveRegressionUDTF.java +++ b/core/src/main/java/hivemall/regression/PassiveAggressiveRegressionUDTF.java @@ -18,10 +18,10 @@ */ package hivemall.regression; -import hivemall.common.OnlineVariance; import hivemall.model.FeatureValue; import hivemall.model.PredictionResult; import hivemall.optimizer.LossFunctions; +import hivemall.utils.stats.OnlineVariance; import javax.annotation.Nonnull; http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/sketch/bloom/BloomAndUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/sketch/bloom/BloomAndUDF.java b/core/src/main/java/hivemall/sketch/bloom/BloomAndUDF.java new file mode 100644 index 0000000..87769da --- /dev/null +++ b/core/src/main/java/hivemall/sketch/bloom/BloomAndUDF.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.sketch.bloom; + +import java.io.IOException; + +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.bloom.DynamicBloomFilter; +import org.apache.hadoop.util.bloom.Filter; + +@Description(name = "bloom_and", + value = "_FUNC_(string bloom1, string bloom2) - Returns the logical AND of two bloom filters") +@UDFType(deterministic = true, stateful = false) +public final class BloomAndUDF extends UDF { + + @Nullable + public Text evaluate(@Nullable Text bloom1Str, @Nullable Text bloom2Str) throws HiveException { + if (bloom1Str == null || bloom2Str == null) { + return null; + } + + final Filter bloom1; + final Filter bloom2; + try { + bloom1 = BloomFilterUtils.deserialize(bloom1Str, new DynamicBloomFilter()); + bloom2 = BloomFilterUtils.deserialize(bloom2Str, new DynamicBloomFilter()); + } catch (IOException e) { + throw new HiveException(e); + } + + bloom1.and(bloom2); + + try { + return BloomFilterUtils.serialize(bloom1, new Text()); + } catch (IOException e) { + throw new HiveException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/sketch/bloom/BloomContainsUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/sketch/bloom/BloomContainsUDF.java b/core/src/main/java/hivemall/sketch/bloom/BloomContainsUDF.java new file mode 100644 index 0000000..2da65b3 --- /dev/null +++ b/core/src/main/java/hivemall/sketch/bloom/BloomContainsUDF.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.sketch.bloom; + +import java.io.IOException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.bloom.DynamicBloomFilter; +import org.apache.hadoop.util.bloom.Filter; +import org.apache.hadoop.util.bloom.Key; + +@Description(name = "bloom_contains", + value = "_FUNC_(string bloom, string key) - Returns true if the bloom filter contains the given key") +@UDFType(deterministic = true, stateful = false) +public final class BloomContainsUDF extends UDF { + + @Nonnull + private final Key key = new Key(); + + @Nullable + private Text prevKey; + @Nullable + private Filter prevFilter; + + @Nullable + public Boolean evaluate(@Nullable Text bloomStr, @Nullable Text keyStr) throws HiveException { + if (bloomStr == null || key == null) { + return null; + } + + final Filter bloom; + if (prevFilter != null && prevKey.equals(keyStr)) { + bloom = prevFilter; + } else { + try { + bloom = BloomFilterUtils.deserialize(bloomStr, new DynamicBloomFilter()); + } catch (IOException e) { + throw new HiveException(e); + } + this.prevKey = keyStr; + this.prevFilter = bloom; + key.set(keyStr.getBytes(), 1.0d); + } + + return Boolean.valueOf(bloom.membershipTest(key)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/sketch/bloom/BloomFilterUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/sketch/bloom/BloomFilterUtils.java b/core/src/main/java/hivemall/sketch/bloom/BloomFilterUtils.java new file mode 100644 index 0000000..2767022 --- /dev/null +++ b/core/src/main/java/hivemall/sketch/bloom/BloomFilterUtils.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.sketch.bloom; + +import static hivemall.utils.math.MathUtils.LOG2; + +import hivemall.utils.io.Base91InputStream; +import hivemall.utils.io.Base91OutputStream; +import hivemall.utils.io.FastByteArrayInputStream; +import hivemall.utils.io.FastByteArrayOutputStream; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.DynamicBloomFilter; +import org.apache.hadoop.util.bloom.Filter; +import org.apache.hadoop.util.hash.Hash; + +public final class BloomFilterUtils { + + public static final int DEFAULT_BLOOM_FILTER_SIZE = 1024 * 1024; + public static final float DEFAULT_ERROR_RATE = 0.005f; + public static final int NUM_HASHES = 5; + + @Nonnull + public static BloomFilter newBloomFilter(@Nonnegative final int expectedNumberOfElements) { + return newBloomFilter(expectedNumberOfElements, DEFAULT_ERROR_RATE); + } + + @Nonnull + public static BloomFilter newBloomFilter(@Nonnegative final int expectedNumberOfElements, + @Nonnegative final float errorRate) { + // k = ceil(-log_2(false prob.)) + int nbHash = Math.max(2, (int) Math.ceil(-(Math.log(errorRate) / LOG2))); + return newBloomFilter(expectedNumberOfElements, errorRate, nbHash); + } + + @Nonnull + public static BloomFilter newBloomFilter(@Nonnegative final int expectedNumberOfElements, + @Nonnegative final float errorRate, @Nonnegative final int nbHash) { + // vector size should be `-kn / (ln(1 - c^(1/k)))` bits for + // single key, where `k` is the number of hash functions, + // `n` is the number of keys and `c` is the desired max error rate. + int vectorSize = (int) Math.ceil((-nbHash * expectedNumberOfElements) + / Math.log(1.d - Math.pow(errorRate, 1.d / nbHash))); + return new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); + } + + @Nonnull + public static DynamicBloomFilter newDynamicBloomFilter() { + return newDynamicBloomFilter(DEFAULT_BLOOM_FILTER_SIZE, DEFAULT_ERROR_RATE, NUM_HASHES); + } + + @Nonnull + public static DynamicBloomFilter newDynamicBloomFilter( + @Nonnegative final int expectedNumberOfElements) { + return newDynamicBloomFilter(expectedNumberOfElements, DEFAULT_ERROR_RATE); + } + + @Nonnull + public static DynamicBloomFilter newDynamicBloomFilter( + @Nonnegative final int expectedNumberOfElements, @Nonnegative final float errorRate) { + // k = ceil(-log_2(false prob.)) + int nbHash = Math.max(2, (int) Math.ceil(-(Math.log(errorRate) / LOG2))); + return newDynamicBloomFilter(expectedNumberOfElements, errorRate, nbHash); + } + + @Nonnull + public static DynamicBloomFilter newDynamicBloomFilter( + @Nonnegative final int expectedNumberOfElements, @Nonnegative final float errorRate, + @Nonnegative final int nbHash) { + int vectorSize = (int) Math.ceil((-nbHash * expectedNumberOfElements) + / Math.log(1.d - Math.pow(errorRate, 1.d / nbHash))); + return new DynamicBloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH, + expectedNumberOfElements); + } + + @Nonnull + public static byte[] serialize(@Nonnull final Filter filter) throws IOException { + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); + Base91OutputStream base91 = new Base91OutputStream(bos); + DataOutputStream out = new DataOutputStream(base91); + filter.write(out); + out.flush(); + base91.finish(); + return bos.toByteArray(); + } + + @Nonnull + public static Text serialize(@Nonnull final Filter filter, @Nonnull final Text dst) + throws IOException { + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); + Base91OutputStream base91 = new Base91OutputStream(bos); + DataOutputStream out = new DataOutputStream(base91); + filter.write(out); + out.flush(); + base91.finish(); + dst.set(bos.getInternalArray(), 0, bos.size()); + return dst; + } + + @Nonnull + public static <F extends Filter> F deserialize(@Nonnull final Text in, @Nonnull final F dst) + throws IOException { + return deserialize(in.getBytes(), 0, in.getLength(), dst); + } + + @Nonnull + public static <F extends Filter> F deserialize(@Nonnull final byte[] buf, @Nonnull final F dst) + throws IOException { + return deserialize(buf, 0, buf.length, dst); + } + + @Nonnull + public static <F extends Filter> F deserialize(@Nonnull final byte[] buf, + @Nonnegative final int offset, @Nonnegative final int len, @Nonnull final F dst) + throws IOException { + FastByteArrayInputStream fis = new FastByteArrayInputStream(buf, offset, len); + DataInput in = new DataInputStream(new Base91InputStream(fis)); + dst.readFields(in); + return dst; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/sketch/bloom/BloomNotUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/sketch/bloom/BloomNotUDF.java b/core/src/main/java/hivemall/sketch/bloom/BloomNotUDF.java new file mode 100644 index 0000000..cd385e3 --- /dev/null +++ b/core/src/main/java/hivemall/sketch/bloom/BloomNotUDF.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.sketch.bloom; + +import java.io.IOException; + +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.bloom.DynamicBloomFilter; +import org.apache.hadoop.util.bloom.Filter; + +@Description(name = "bloom_not", + value = "_FUNC_(string bloom) - Returns the logical NOT of a bloom filters") +@UDFType(deterministic = true, stateful = false) +public final class BloomNotUDF extends UDF { + + @Nullable + public Text evaluate(@Nullable Text bloomStr) throws HiveException { + if (bloomStr == null) { + return null; + } + + final Filter bloom; + try { + bloom = BloomFilterUtils.deserialize(bloomStr, new DynamicBloomFilter()); + } catch (IOException e) { + throw new HiveException(e); + } + + bloom.not(); + + try { + return BloomFilterUtils.serialize(bloom, new Text()); + } catch (IOException e) { + throw new HiveException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/sketch/bloom/BloomOrUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/sketch/bloom/BloomOrUDF.java b/core/src/main/java/hivemall/sketch/bloom/BloomOrUDF.java new file mode 100644 index 0000000..7d2980e --- /dev/null +++ b/core/src/main/java/hivemall/sketch/bloom/BloomOrUDF.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.sketch.bloom; + +import java.io.IOException; + +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.bloom.DynamicBloomFilter; +import org.apache.hadoop.util.bloom.Filter; + +@Description(name = "bloom_or", + value = "_FUNC_(string bloom1, string bloom2) - Returns the logical OR of two bloom filters") +@UDFType(deterministic = true, stateful = false) +public final class BloomOrUDF extends UDF { + + @Nullable + public Text evaluate(@Nullable Text bloom1Str, @Nullable Text bloom2Str) throws HiveException { + if (bloom1Str == null || bloom2Str == null) { + return null; + } + + final Filter bloom1; + final Filter bloom2; + try { + bloom1 = BloomFilterUtils.deserialize(bloom1Str, new DynamicBloomFilter()); + bloom2 = BloomFilterUtils.deserialize(bloom2Str, new DynamicBloomFilter()); + } catch (IOException e) { + throw new HiveException(e); + } + + bloom1.or(bloom2); + + try { + return BloomFilterUtils.serialize(bloom1, new Text()); + } catch (IOException e) { + throw new HiveException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/sketch/bloom/BloomUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/sketch/bloom/BloomUDAF.java b/core/src/main/java/hivemall/sketch/bloom/BloomUDAF.java new file mode 100644 index 0000000..cb09b93 --- /dev/null +++ b/core/src/main/java/hivemall/sketch/bloom/BloomUDAF.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.sketch.bloom; + +import java.io.IOException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.bloom.DynamicBloomFilter; +import org.apache.hadoop.util.bloom.Filter; +import org.apache.hadoop.util.bloom.Key; + +@Description(name = "bloom", + value = "_FUNC_(string key) - Constructs a BloomFilter by aggregating a set of keys") +@SuppressWarnings("deprecation") +public final class BloomUDAF extends UDAF { + + public static class Evaluator implements UDAFEvaluator { + + private Filter filter; + private Key key; + + @Override + public void init() { + this.filter = BloomFilterUtils.newDynamicBloomFilter(); + this.key = new Key(); + } + + public boolean iterate(@Nullable Text keyStr) { + if (keyStr == null) { + return true; + } + key.set(keyStr.getBytes(), 1.0d); + + filter.add(key); + + return true; + } + + @Nonnull + public Text terminatePartial() throws HiveException { + try { + return BloomFilterUtils.serialize(filter, new Text()); + } catch (IOException e) { + throw new HiveException(e); + } + } + + public boolean merge(@Nonnull Text partial) throws HiveException { + final DynamicBloomFilter other; + try { + other = BloomFilterUtils.deserialize(partial, new DynamicBloomFilter()); + } catch (IOException e) { + throw new HiveException(e); + } + + if (filter == null) { + this.filter = other; + } else { + filter.or(other); + } + return true; + } + + @Nullable + public Text terminate() throws HiveException { + if (filter == null) { + return null; + } + + try { + return BloomFilterUtils.serialize(filter, new Text()); + } catch (IOException e) { + throw new HiveException(e); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/statistics/MovingAverageUDTF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/statistics/MovingAverageUDTF.java b/core/src/main/java/hivemall/statistics/MovingAverageUDTF.java new file mode 100644 index 0000000..112c47f --- /dev/null +++ b/core/src/main/java/hivemall/statistics/MovingAverageUDTF.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.statistics; + +import hivemall.utils.hadoop.HiveUtils; +import hivemall.utils.stats.MovingAverage; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +@Description(name = "moving_avg", value = "_FUNC_(NUMBER value, const int windowSize)" + + " - Returns moving average of a time series using a given window") +public final class MovingAverageUDTF extends GenericUDTF { + + private PrimitiveObjectInspector valueOI; + + private MovingAverage movingAvg; + + private Object[] forwardObjs; + private DoubleWritable result; + + @Override + public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 2) { + throw new UDFArgumentException( + "Two argument is expected for moving_avg(NUMBER value, const int windowSize): " + + argOIs.length); + } + this.valueOI = HiveUtils.asNumberOI(argOIs[0]); + + int windowSize = HiveUtils.getConstInt(argOIs[1]); + this.movingAvg = new MovingAverage(windowSize); + + this.result = new DoubleWritable(); + this.forwardObjs = new Object[] {result}; + + List<String> fieldNames = Arrays.asList("avg"); + List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList( + PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); + } + + @Override + public void process(Object[] args) throws HiveException { + double x = HiveUtils.getDouble(args[0], valueOI); + + double avg = movingAvg.add(x); + result.set(avg); + + forward(forwardObjs); + } + + @Override + public void close() throws HiveException {} + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/TryCastUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/TryCastUDF.java b/core/src/main/java/hivemall/tools/TryCastUDF.java new file mode 100644 index 0000000..a0f3257 --- /dev/null +++ b/core/src/main/java/hivemall/tools/TryCastUDF.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools; + +import hivemall.utils.hadoop.HiveUtils; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; + +@Description(name = "try_cast", + value = "_FUNC_(ANY src, const string typeName)" + + " - Explicitly cast a value as a type. Returns null if cast fails.", + extended = "Usage: select try_cast(array(1.0,2.0,3.0), 'array<string>')\n" + + " select try_cast(map('A',10,'B',20,'C',30), 'map<string,double>')") +@UDFType(deterministic = true, stateful = false) +public final class TryCastUDF extends GenericUDF { + + private ObjectInspector inputOI; + private Converter converter; + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 2) { + throw new UDFArgumentException( + "try_cast(ANY src, const string typeName) expects exactly two arguments"); + } + + this.inputOI = argOIs[0]; + String typeString = HiveUtils.getConstString(argOIs[1]); + + ObjectInspector outputOI = HiveUtils.getObjectInspector(typeString, true); + this.converter = ObjectInspectorConverters.getConverter(inputOI, outputOI); + + return outputOI; + } + + @Override + public Object evaluate(DeferredObject[] args) throws HiveException { + Object arg0 = args[0].get(); + if (arg0 == null) { + return null; + } + + Object input = ObjectInspectorUtils.copyToStandardObject(arg0, inputOI); + try { + return converter.convert(input); + } catch (Exception e) { + return null; + } + } + + @Override + public String getDisplayString(String[] args) { + return "try_cast(" + Arrays.toString(args) + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/ArrayAppendUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/ArrayAppendUDF.java b/core/src/main/java/hivemall/tools/array/ArrayAppendUDF.java new file mode 100644 index 0000000..25d0f4c --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/ArrayAppendUDF.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; + +@Description(name = "array_append", + value = "_FUNC_(array<T> arr, T elem) - Append an element to the end of an array") +@UDFType(deterministic = true, stateful = false) +public final class ArrayAppendUDF extends GenericUDF { + + private ListObjectInspector listInspector; + private PrimitiveObjectInspector listElemInspector; + private PrimitiveObjectInspector primInspector; + private boolean returnWritables; + + private final List<Object> ret = new ArrayList<Object>(); + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + this.listInspector = HiveUtils.asListOI(argOIs[0]); + this.listElemInspector = + HiveUtils.asPrimitiveObjectInspector(listInspector.getListElementObjectInspector()); + this.primInspector = HiveUtils.asPrimitiveObjectInspector(argOIs[1]); + if (listElemInspector.getPrimitiveCategory() != primInspector.getPrimitiveCategory()) { + throw new UDFArgumentException( + "array_append expects the list type to match the type of the value being appended"); + } + this.returnWritables = listElemInspector.preferWritable(); + return ObjectInspectorFactory.getStandardListObjectInspector( + ObjectInspectorUtils.getStandardObjectInspector(listElemInspector)); + } + + @Nullable + @Override + public List<Object> evaluate(@Nonnull DeferredObject[] args) throws HiveException { + ret.clear(); + + Object arg0 = args[0].get(); + if (arg0 == null) { + return null; + } + + final int size = listInspector.getListLength(arg0); + for (int i = 0; i < size; i++) { + Object rawElem = listInspector.getListElement(arg0, i); + if (rawElem == null) { + continue; + } + Object obj = returnWritables ? listElemInspector.getPrimitiveWritableObject(rawElem) + : listElemInspector.getPrimitiveJavaObject(rawElem); + ret.add(obj); + } + + Object arg1 = args[1].get(); + if (arg1 != null) { + Object toAppend = returnWritables ? primInspector.getPrimitiveWritableObject(arg1) + : primInspector.getPrimitiveJavaObject(arg1); + ret.add(toAppend); + } + + return ret; + } + + @Override + public String getDisplayString(String[] args) { + return "array_append(" + args[0] + ", " + args[1] + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/ArrayElementAtUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/ArrayElementAtUDF.java b/core/src/main/java/hivemall/tools/array/ArrayElementAtUDF.java new file mode 100644 index 0000000..631d92d --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/ArrayElementAtUDF.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; + +@Description(name = "element_at", + value = "_FUNC_(array<T> list, int pos) - Returns an element at the given position") +@UDFType(deterministic = true, stateful = false) +public final class ArrayElementAtUDF extends GenericUDF { + private ListObjectInspector listInspector; + private IntObjectInspector intInspector; + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 2) { + throw new UDFArgumentException("element_at takes an array and an int as arguments"); + } + this.listInspector = HiveUtils.asListOI(argOIs[0]); + this.intInspector = HiveUtils.asIntOI(argOIs[1]); + + return listInspector.getListElementObjectInspector(); + } + + @Override + public Object evaluate(DeferredObject[] args) throws HiveException { + Object list = args[0].get(); + if (list == null) { + return null; + } + Object arg1 = args[1].get(); + if (arg1 == null) { + throw new HiveException("Index MUST not be null"); + } + final int arrayLength = listInspector.getListLength(list); + + int idx = intInspector.get(arg1); + if (idx < 0) { + idx = arrayLength + idx; + if (idx < 0) { + return null; + } + } else if (idx >= arrayLength) { + return null; // IndexOutOfBound + } + + return listInspector.getListElement(list, idx); + } + + @Override + public String getDisplayString(String[] args) { + return "element_at( " + args[0] + " , " + args[1] + " )"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/ArrayFlattenUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/ArrayFlattenUDF.java b/core/src/main/java/hivemall/tools/array/ArrayFlattenUDF.java new file mode 100644 index 0000000..906d594 --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/ArrayFlattenUDF.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; + +@Description(name = "array_flatten", + value = "_FUNC_(array<array<ANY>>) - Returns an array with the elements flattened.") +@UDFType(deterministic = true, stateful = false) +public final class ArrayFlattenUDF extends GenericUDF { + + private ListObjectInspector listOI; + private ListObjectInspector nextedListOI; + private ObjectInspector elemOI; + + private final List<Object> result = new ArrayList<>(); + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 1) { + throw new UDFArgumentException( + "array_flatten expects exactly one argument: " + argOIs.length); + } + + this.listOI = HiveUtils.asListOI(argOIs[0]); + ObjectInspector listElemOI = listOI.getListElementObjectInspector(); + if (listElemOI.getCategory() != Category.LIST) { + throw new UDFArgumentException( + "array_flatten takes array of array for the argument: " + listOI.toString()); + } + this.nextedListOI = HiveUtils.asListOI(listElemOI); + this.elemOI = nextedListOI.getListElementObjectInspector(); + + return ObjectInspectorFactory.getStandardListObjectInspector( + ObjectInspectorUtils.getStandardObjectInspector(elemOI)); + } + + @Override + public List<Object> evaluate(DeferredObject[] args) throws HiveException { + result.clear(); + + Object arg0 = args[0].get(); + if (arg0 == null) { + return null; + } + + final int listLength = listOI.getListLength(arg0); + for (int i = 0; i < listLength; i++) { + final Object subarray = listOI.getListElement(arg0, i); + if (subarray == null) { + continue; + } + + final int subarrayLength = nextedListOI.getListLength(subarray); + for (int j = 0; j < subarrayLength; j++) { + Object elem = nextedListOI.getListElement(subarray, j); + if (elem == null) { + continue; + } + result.add(elem); + } + } + + return result; + } + + @Override + public String getDisplayString(String[] args) { + final StringBuffer buf = new StringBuffer(); + buf.append("array_flatten("); + for (int i = 0, len = args.length; i < len; i++) { + if (i != 0) { + buf.append(", "); + } + buf.append(args[i]); + } + buf.append(")"); + return buf.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/ArraySliceUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/ArraySliceUDF.java b/core/src/main/java/hivemall/tools/array/ArraySliceUDF.java new file mode 100644 index 0000000..4676acc --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/ArraySliceUDF.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; +import hivemall.utils.lang.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; + +@Description(name = "array_slice", + value = "_FUNC_(array<ANY> values, int offset [, int length]) - Slices the given array by the given offset and length parameters.") +@UDFType(deterministic = true, stateful = false) +public final class ArraySliceUDF extends GenericUDF { + + private ListObjectInspector valuesOI; + private PrimitiveObjectInspector offsetOI; + @Nullable + private PrimitiveObjectInspector lengthOI; + + private final List<Object> result = new ArrayList<>(); + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 2 && argOIs.length != 3) { + throw new UDFArgumentLengthException( + "Expected 2 or 3 arguments, but got " + argOIs.length); + } + + this.valuesOI = HiveUtils.asListOI(argOIs[0]); + this.offsetOI = HiveUtils.asIntegerOI(argOIs[1]); + if (argOIs.length == 3) { + this.lengthOI = HiveUtils.asIntegerOI(argOIs[2]); + } + + ObjectInspector elemOI = valuesOI.getListElementObjectInspector(); + return ObjectInspectorFactory.getStandardListObjectInspector(elemOI); + } + + @Nullable + @Override + public List<Object> evaluate(@Nonnull DeferredObject[] args) throws HiveException { + Object arg0 = args[0].get(); + if (arg0 == null) { + return null; + } + final int size = valuesOI.getListLength(arg0); + + result.clear(); + + Object arg1 = args[1].get(); + if (arg1 == null) { + throw new UDFArgumentException("Offset argument MUST NOT be null"); + } + + final int offset = PrimitiveObjectInspectorUtils.getInt(arg1, offsetOI); + final int fromIndex = (offset < 0) ? size + offset : offset; + + final int toIndex; + if (args.length == 3) { + Object arg2 = args[2].get(); + if (arg2 == null) { + toIndex = size; + } else { + final int length = PrimitiveObjectInspectorUtils.getInt(arg2, lengthOI); + if (length < 0) { + toIndex = size + length; + } else { + toIndex = Math.min(size, fromIndex + length); + } + } + } else { + toIndex = size; + } + + if (!validRange(fromIndex, toIndex, size)) { + return null; + } + + for (int i = fromIndex; i < toIndex; i++) { + Object e = valuesOI.getListElement(arg0, i); + result.add(e); + } + + return result; + } + + private static boolean validRange(final int fromIndex, final int toIndex, final int size) + throws HiveException { + if (fromIndex < 0) { + return false; + } + if (toIndex < 0) { + return false; + } + if (toIndex > size) { + return false; + } + if (fromIndex > toIndex) { + return false; + } + return true; + } + + @Override + public String getDisplayString(String[] args) { + return "array_slice(" + StringUtils.join(args, ',') + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/ArrayUnionUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/ArrayUnionUDF.java b/core/src/main/java/hivemall/tools/array/ArrayUnionUDF.java new file mode 100644 index 0000000..c1201d4 --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/ArrayUnionUDF.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; + +/** + * Return a list of unique entries for a given set of lists. + * + * <pre> + * {1, 2} ⪠{1, 2} = {1, 2}, + * {1, 2} ⪠{2, 3} = {1, 2, 3}, + * {1, 2, 3} ⪠{3, 4, 5} = {1, 2, 3, 4, 5} + * </pre> + */ +@Description(name = "array_union", + value = "_FUNC_(array1, array2, ...) - Returns the union of a set of arrays") +@UDFType(deterministic = true, stateful = false) +public final class ArrayUnionUDF extends GenericUDF { + + private ListObjectInspector[] _listOIs; + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length < 2) { + throw new UDFArgumentException("Expecting at least two arrays as arguments"); + } + + ListObjectInspector[] listOIs = new ListObjectInspector[argOIs.length]; + ListObjectInspector arg0OI = HiveUtils.asListOI(argOIs[0]); + listOIs[0] = arg0OI; + ObjectInspector arg0ElemOI = arg0OI.getListElementObjectInspector(); + + for (int i = 1; i < argOIs.length; ++i) { + ListObjectInspector checkOI = HiveUtils.asListOI(argOIs[i]); + if (!ObjectInspectorUtils.compareTypes(arg0ElemOI, + checkOI.getListElementObjectInspector())) { + throw new UDFArgumentException("Array types does not match: " + arg0OI.getTypeName() + + " != " + checkOI.getTypeName()); + } + listOIs[i] = checkOI; + } + + this._listOIs = listOIs; + + return ObjectInspectorFactory.getStandardListObjectInspector( + ObjectInspectorUtils.getStandardObjectInspector(arg0ElemOI, + ObjectInspectorCopyOption.WRITABLE)); + } + + @Override + public List<Object> evaluate(DeferredObject[] args) throws HiveException { + final Set<Object> objectSet = new TreeSet<Object>(); // new HashSet<Object>(); + + for (int i = 0; i < args.length; ++i) { + final Object undeferred = args[i].get(); + if (undeferred == null) { + continue; + } + + final ListObjectInspector oi = _listOIs[i]; + final ObjectInspector elemOI = oi.getListElementObjectInspector(); + + for (int j = 0, len = oi.getListLength(undeferred); j < len; ++j) { + Object nonStd = oi.getListElement(undeferred, j); + Object copyed = ObjectInspectorUtils.copyToStandardObject(nonStd, elemOI, + ObjectInspectorCopyOption.WRITABLE); + objectSet.add(copyed); + } + } + + return new ArrayList<>(objectSet); + } + + @Override + public String getDisplayString(String[] args) { + return "array_union(" + args[0] + ", " + args[1] + " )"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/ConditionalEmitUDTF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/ConditionalEmitUDTF.java b/core/src/main/java/hivemall/tools/array/ConditionalEmitUDTF.java new file mode 100644 index 0000000..a73a06f --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/ConditionalEmitUDTF.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; + +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nonnull; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; + +/** + * ConditionalEmit takes an array of booleans and strings, and emits records if the boolean is true. + * + * <p/> + * This allows you to emit multiple rows on one pass of the data, rather than doing a union of + * multiple views with different where clauses. + * <p/> + * + * <pre> + * select + * conditional_emit( + * array( maxwell_score > 80, abs( maxwell_score - other.maxwell_score ) < 5, city = "New York" ), + * array( "CELEB", "PEER", "NEW_YORKER" ) + * ) + * from + * table_to_scan_once + * </pre> + */ +@Description(name = "conditional_emit", + value = "_FUNC_(array<boolean> conditions, array<primitive> features)" + + " - Emit features of a row according to various conditions") +@UDFType(deterministic = true, stateful = false) +public final class ConditionalEmitUDTF extends GenericUDTF { + + private ListObjectInspector conditionsOI; + private BooleanObjectInspector condElemOI; + private ListObjectInspector featuresOI; + private PrimitiveObjectInspector featureElemOI; + + private final Object[] forwardObj = new Object[1]; + + @Override + public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 2) { + throw new UDFArgumentException( + "conditional_emit takes 2 arguments: array<boolean>, array<primitive>"); + } + + this.conditionsOI = HiveUtils.asListOI(argOIs[0]); + this.condElemOI = HiveUtils.asBooleanOI(conditionsOI.getListElementObjectInspector()); + + this.featuresOI = HiveUtils.asListOI(argOIs[1]); + this.featureElemOI = + HiveUtils.asPrimitiveObjectInspector(featuresOI.getListElementObjectInspector()); + + List<String> fieldNames = Arrays.asList("feature"); + List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(featureElemOI); + + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); + } + + @Override + public void process(@Nonnull Object[] args) throws HiveException { + Object arg0 = args[0], arg1 = args[1]; + if (arg0 == null || arg1 == null) { + return; + } + + final int conditionSize = conditionsOI.getListLength(arg0); + final int featureSize = featuresOI.getListLength(arg1); + if (conditionSize != featureSize) { + throw new HiveException( + "Arrays must be of same length in condition_emit(array<boolean> conditions, array<string> features).\n" + + "#conditions=" + conditionSize + ", #features=" + featureSize); + } + + for (int i = 0; i < conditionSize; i++) { + Object condObj = conditionsOI.getListElement(arg0, i); + if (condObj == null) { + continue; + } + if (condElemOI.get(condObj) == false) { + continue; + } + Object featureObj = featuresOI.getListElement(arg1, i); + if (featureObj == null) { + continue; + } + + forwardObj[0] = featureObj; + forward(forwardObj); + } + + } + + @Override + public void close() throws HiveException {} + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/FirstElementUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/FirstElementUDF.java b/core/src/main/java/hivemall/tools/array/FirstElementUDF.java new file mode 100644 index 0000000..957e724 --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/FirstElementUDF.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * Return the first element in an array. + */ +@Description(name = "first_element", value = "_FUNC_(x) - Returns the first element in an array ") +@UDFType(deterministic = true, stateful = false) +public class FirstElementUDF extends GenericUDF { + + private ListObjectInspector listInspector; + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 1) { + throw new UDFArgumentException("first_element takes an array as an argument."); + } + this.listInspector = HiveUtils.asListOI(argOIs[0]); + + return listInspector.getListElementObjectInspector(); + } + + @Override + public Object evaluate(DeferredObject[] args) throws HiveException { + Object list = args[0].get(); + if (list == null) { + return null; + } + if (listInspector.getListLength(list) == 0) { + return null; + } + + return listInspector.getListElement(list, 0); + } + + @Override + public String getDisplayString(String[] args) { + return "first_element( " + args[0] + " )"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/LastElementUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/LastElementUDF.java b/core/src/main/java/hivemall/tools/array/LastElementUDF.java new file mode 100644 index 0000000..3584cb8 --- /dev/null +++ b/core/src/main/java/hivemall/tools/array/LastElementUDF.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.array; + +import hivemall.utils.hadoop.HiveUtils; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * Return the last element in an array. + */ +@Description(name = "last_element", value = "_FUNC_(x) - Retturn the last element in an array") +@UDFType(deterministic = true, stateful = false) +public class LastElementUDF extends GenericUDF { + + private ListObjectInspector listInspector; + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 1) { + throw new UDFArgumentException("last_element takes an array as an argument"); + } + listInspector = HiveUtils.asListOI(argOIs[0]); + + return listInspector.getListElementObjectInspector(); + } + + @Override + public Object evaluate(DeferredObject[] args) throws HiveException { + Object list = args[0].get(); + if (list == null) { + return null; + } + + final int lastIdx = listInspector.getListLength(list) - 1; + if (lastIdx >= 0) { + return listInspector.getListElement(list, lastIdx); + } else { + return null; + } + } + + @Override + public String getDisplayString(String[] args) { + return "last_element( " + args[0] + " )"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/array/SubarrayUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/SubarrayUDF.java b/core/src/main/java/hivemall/tools/array/SubarrayUDF.java deleted file mode 100644 index 0b63a93..0000000 --- a/core/src/main/java/hivemall/tools/array/SubarrayUDF.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package hivemall.tools.array; - -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.UDF; -import org.apache.hadoop.hive.ql.udf.UDFType; -import org.apache.hadoop.io.IntWritable; - -@Description(name = "subarray", value = "_FUNC_(array<int> orignal, int fromIndex, int toIndex)" - + " - Returns a slice of the original array" - + " between the inclusive fromIndex and the exclusive toIndex") -@UDFType(deterministic = true, stateful = false) -public class SubarrayUDF extends UDF { - - public List<IntWritable> evaluate(List<IntWritable> array, int fromIndex, int toIndex) { - if (array == null) { - return null; - } - final int arraylength = array.size(); - if (fromIndex < 0) { - fromIndex = 0; - } - if (toIndex > arraylength) { - toIndex = arraylength; - } - return array.subList(fromIndex, toIndex); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/json/FromJsonUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/json/FromJsonUDF.java b/core/src/main/java/hivemall/tools/json/FromJsonUDF.java new file mode 100644 index 0000000..36c29cc --- /dev/null +++ b/core/src/main/java/hivemall/tools/json/FromJsonUDF.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.json; + +import hivemall.utils.hadoop.HiveUtils; +import hivemall.utils.hadoop.JsonSerdeUtils; +import hivemall.utils.lang.ExceptionUtils; +import hivemall.utils.lang.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hive.hcatalog.data.HCatRecordObjectInspectorFactory; + +@Description(name = "from_json", + value = "_FUNC_(string jsonString, const string returnTypes [, const array<string>|const string columnNames])" + + " - Return Hive object.") +@UDFType(deterministic = true, stateful = false) +public final class FromJsonUDF extends GenericUDF { + + private StringObjectInspector jsonOI; + + private List<TypeInfo> columnTypes; + @Nullable + private List<String> columnNames; + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 2 && argOIs.length != 3) { + throw new UDFArgumentException( + "from_json takes two or three arguments: " + argOIs.length); + } + + this.jsonOI = HiveUtils.asStringOI(argOIs[0]); + + String typeString = HiveUtils.getConstString(argOIs[1]); + this.columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(typeString); + + if (argOIs.length == 3) { + final ObjectInspector argOI2 = argOIs[2]; + if (HiveUtils.isConstString(argOI2)) { + String names = HiveUtils.getConstString(argOI2); + this.columnNames = Arrays.asList(names.split(",")); + } else if (HiveUtils.isConstStringListOI(argOI2)) { + this.columnNames = Arrays.asList(HiveUtils.getConstStringArray(argOI2)); + } else { + throw new UDFArgumentException("Expected `const array<string>` or `const string`" + + " but got an unexpected OI type for the third argument: " + argOI2); + } + } + + return getObjectInspector(columnTypes, columnNames); + } + + @Nonnull + private static ObjectInspector getObjectInspector(@Nonnull final List<TypeInfo> columnTypes, + @Nullable List<String> columnNames) throws UDFArgumentException { + if (columnTypes.isEmpty()) { + throw new UDFArgumentException("Returning columnTypes MUST NOT be null"); + } + + final ObjectInspector returnOI; + final int numColumns = columnTypes.size(); + if (numColumns == 1) { + TypeInfo type = columnTypes.get(0); + returnOI = + HCatRecordObjectInspectorFactory.getStandardObjectInspectorFromTypeInfo(type); + } else { + if (columnNames == null) { + columnNames = new ArrayList<>(numColumns); + for (int i = 0; i < numColumns; i++) { + columnNames.add("c" + i); + } + } else { + if (columnNames.size() != numColumns) { + throw new UDFArgumentException("#columnNames != #columnTypes. #columnName=" + + columnNames.size() + ", #columnTypes=" + numColumns); + } + } + final ObjectInspector[] fieldOIs = new ObjectInspector[numColumns]; + for (int i = 0; i < fieldOIs.length; i++) { + TypeInfo type = columnTypes.get(i); + fieldOIs[i] = + HCatRecordObjectInspectorFactory.getStandardObjectInspectorFromTypeInfo( + type); + } + returnOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, + Arrays.asList(fieldOIs)); + } + return returnOI; + } + + @Override + public Object evaluate(DeferredObject[] args) throws HiveException { + Object arg0 = args[0].get(); + if (arg0 == null) { + return null; + } + Text jsonString = jsonOI.getPrimitiveWritableObject(arg0); + + final Object result; + try { + result = JsonSerdeUtils.deserialize(jsonString, columnNames, columnTypes); + } catch (Throwable e) { + throw new HiveException("Failed to deserialize Json: \n" + jsonString.toString() + '\n' + + ExceptionUtils.prettyPrintStackTrace(e), + e); + } + return result; + } + + @Override + public String getDisplayString(String[] args) { + return "from_json(" + StringUtils.join(args, ',') + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/json/ToJsonUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/json/ToJsonUDF.java b/core/src/main/java/hivemall/tools/json/ToJsonUDF.java new file mode 100644 index 0000000..70c62b9 --- /dev/null +++ b/core/src/main/java/hivemall/tools/json/ToJsonUDF.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.json; + +import hivemall.utils.hadoop.HiveUtils; +import hivemall.utils.hadoop.JsonSerdeUtils; +import hivemall.utils.lang.ExceptionUtils; +import hivemall.utils.lang.StringUtils; + +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.Text; + +@Description(name = "to_json", + value = "_FUNC_(ANY object [, const array<string>|const string columnNames]) - Returns Json string") +@UDFType(deterministic = true, stateful = false) +public final class ToJsonUDF extends GenericUDF { + + private ObjectInspector objOI; + + @Nullable + private List<String> columnNames; + + @Override + public ObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 1 && argOIs.length != 2) { + throw new UDFArgumentException( + "from_json takes one or two arguments: " + argOIs.length); + } + + this.objOI = argOIs[0]; + if (argOIs.length == 2) { + final ObjectInspector argOI1 = argOIs[1]; + if (HiveUtils.isConstString(argOI1)) { + String names = HiveUtils.getConstString(argOI1); + this.columnNames = Arrays.asList(names.split(",")); + } else if (HiveUtils.isConstStringListOI(argOI1)) { + this.columnNames = Arrays.asList(HiveUtils.getConstStringArray(argOI1)); + } else { + throw new UDFArgumentException("Expected `const array<string>` or `const string`" + + " but got an unexpected OI type for the third argument: " + argOI1); + } + } + + return PrimitiveObjectInspectorFactory.writableStringObjectInspector; + } + + @Override + public Text evaluate(DeferredObject[] args) throws HiveException { + Object obj = args[0].get(); + if (obj == null) { + return null; + } + + try { + return JsonSerdeUtils.serialize(obj, objOI, columnNames); + } catch (Throwable e) { + throw new HiveException( + "Failed to serialize: " + obj + '\n' + ExceptionUtils.prettyPrintStackTrace(e), e); + } + } + + @Override + public String getDisplayString(String[] children) { + return "to_json(" + StringUtils.join(children, ',') + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1e1b77ea/core/src/main/java/hivemall/tools/sanity/AssertUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/sanity/AssertUDF.java b/core/src/main/java/hivemall/tools/sanity/AssertUDF.java new file mode 100644 index 0000000..d34cd20 --- /dev/null +++ b/core/src/main/java/hivemall/tools/sanity/AssertUDF.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.sanity; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; + +@Description(name = "assert", + value = "_FUNC_(boolean condition) or _FUNC_(boolean condition, string errMsg)" + + "- Throws HiveException if condition is not met") +@UDFType(deterministic = true, stateful = false) +public final class AssertUDF extends UDF { + + public boolean evaluate(boolean condition) throws HiveException { + if (!condition) { + throw new HiveException(); + } + return true; + } + + public boolean evaluate(boolean condition, String errMsg) throws HiveException { + if (!condition) { + throw new HiveException(errMsg); + } + return true; + } + +}