Repository: incubator-hivemall Updated Branches: refs/heads/master 492b5d8e7 -> 1db535876
Close #84: [HIVEMALL-19] Support DIMSUM for approx all-pairs similarity Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/1fbf90a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/1fbf90a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/1fbf90a1 Branch: refs/heads/master Commit: 1fbf90a1eca35cf38c1ae11bc641310f39fd9c8c Parents: 492b5d8 Author: Takuya Kitazawa <[email protected]> Authored: Wed Jun 7 16:07:05 2017 +0900 Committer: myui <[email protected]> Committed: Wed Jun 7 16:07:05 2017 +0900 ---------------------------------------------------------------------- .../knn/similarity/DIMSUMMapperUDTF.java | 353 +++++++++++++++++ .../java/hivemall/tools/math/L2NormUDAF.java | 99 +++++ .../java/hivemall/utils/hadoop/HiveUtils.java | 19 + .../java/hivemall/utils/lang/Primitives.java | 7 + .../knn/similarity/DIMSUMMapperUDTFTest.java | 374 +++++++++++++++++++ .../hivemall/tools/math/L2NormUDAFTest.java | 86 +++++ docs/gitbook/recommend/item_based_cf.md | 213 +++++++++-- resources/ddl/define-all-as-permanent.hive | 8 +- resources/ddl/define-all.hive | 8 +- resources/ddl/define-all.spark | 6 + resources/ddl/define-udfs.td.hql | 2 + 11 files changed, 1133 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java b/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java new file mode 100644 index 0000000..73e218f --- /dev/null +++ b/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.knn.similarity; + +import hivemall.UDTFWithOptions; +import hivemall.fm.Feature; +import hivemall.fm.IntFeature; +import hivemall.fm.StringFeature; +import hivemall.math.random.PRNG; +import hivemall.math.random.RandomNumberGeneratorFactory; +import hivemall.utils.hadoop.HiveUtils; +import hivemall.utils.lang.Primitives; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +@Description( + name = "dimsum_mapper", + value = "_FUNC_(array<string> row, map<int col_id, double norm> colNorms [, const string options]) " + + "- Returns column-wise partial similarities") +public class DIMSUMMapperUDTF extends UDTFWithOptions { + private static final Log logger = LogFactory.getLog(DIMSUMMapperUDTF.class); + + protected ListObjectInspector rowOI; + protected MapObjectInspector colNormsOI; + + @Nullable + protected Feature[] probes; + + @Nonnull + protected PRNG rnd; + + protected double threshold; + protected double sqrtGamma; + protected boolean symmetricOutput; + protected boolean parseFeatureAsInt; + + protected Map<Object, Double> colNorms; + protected Map<Object, Double> colProbs; + + @Override + protected Options getOptions() { + Options opts = new Options(); + opts.addOption("th", "threshold", true, + "Theoretically, similarities above this threshold are estimated [default: 0.5]"); + opts.addOption("g", "gamma", true, + "Oversampling parameter; if `gamma` is given, `threshold` will be ignored" + + " [default: 10 * log(numCols) / threshold]"); + opts.addOption("disable_symmetric", "disable_symmetric_output", false, + "Output only contains (col j, col k) pair; symmetric (col k, col j) pair is omitted"); + opts.addOption("int_feature", "feature_as_integer", false, + "Parse a feature (i.e. column ID) as integer"); + return opts; + } + + @Override + protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs) + throws UDFArgumentException { + double threshold = 0.5d; + double gamma = Double.POSITIVE_INFINITY; + boolean symmetricOutput = true; + boolean parseFeatureAsInt = false; + + CommandLine cl = null; + if (argOIs.length >= 3) { + String rawArgs = HiveUtils.getConstString(argOIs[2]); + cl = parseOptions(rawArgs); + threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), threshold); + if (threshold < 0.f || threshold >= 1.f) { + throw new UDFArgumentException("`threshold` MUST be in range [0,1): " + threshold); + } + gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma); + if (gamma <= 1.d) { + throw new UDFArgumentException("`gamma` MUST be greater than 1: " + gamma); + } + symmetricOutput = !cl.hasOption("disable_symmetric_output"); + parseFeatureAsInt = cl.hasOption("feature_as_integer"); + } + + this.threshold = threshold; + this.sqrtGamma = Math.sqrt(gamma); + this.symmetricOutput = symmetricOutput; + this.parseFeatureAsInt = parseFeatureAsInt; + + return cl; + } + + @Override + public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { + if (argOIs.length != 2 && argOIs.length != 3) { + throw new UDFArgumentException( + getClass().getSimpleName() + + " takes 2 or 3 arguments: array<string> x, map<long, double> colNorms " + + "[, CONSTANT STRING options]: " + + Arrays.toString(argOIs)); + } + + this.rowOI = HiveUtils.asListOI(argOIs[0]); + HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector()); + + this.colNormsOI = HiveUtils.asMapOI(argOIs[1]); + + processOptions(argOIs); + + this.rnd = RandomNumberGeneratorFactory.createPRNG(1001); + this.colNorms = null; + this.colProbs = null; + + ArrayList<String> fieldNames = new ArrayList<String>(); + fieldNames.add("j"); + fieldNames.add("k"); + fieldNames.add("b_jk"); + + ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); + if (parseFeatureAsInt) { + fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector); + fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector); + } else { + fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); + fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); + } + fieldOIs.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); + } + + @Override + public void process(Object[] args) throws HiveException { + Feature[] row = parseFeatures(args[0]); + if (row == null) { + return; + } + this.probes = row; + + // since the 2nd argument (column norms) is consistent, + // column-related values, `colNorms` and `colProbs`, should be cached + if (colNorms == null || colProbs == null) { + final int numCols = colNormsOI.getMapSize(args[1]); + + if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value to `gamma` based on `threshold` + if (threshold > 0.d) { // if `threshold` = 0, `gamma` is INFINITY i.e. always accept <j, k> pairs + this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / threshold); + } + } + + this.colNorms = new HashMap<Object, Double>(numCols); + this.colProbs = new HashMap<Object, Double>(numCols); + final Map<Object, Object> m = (Map<Object, Object>) colNormsOI.getMap(args[1]); + for (Map.Entry<Object, Object> e : m.entrySet()) { + Object j = e.getKey(); + if (parseFeatureAsInt) { + j = HiveUtils.asJavaInt(j); + } else { + j = j.toString(); + } + + double norm = HiveUtils.asJavaDouble(e.getValue()); + if (norm == 0.d) { // avoid zero-division + norm = 1.d; + } + + colNorms.put(j, norm); + + double p = Math.min(1.d, sqrtGamma / norm); + colProbs.put(j, p); + } + } + + if (parseFeatureAsInt) { + forwardAsIntFeature(row); + } else { + forwardAsStringFeature(row); + } + } + + private void forwardAsIntFeature(@Nonnull Feature[] row) throws HiveException { + final int length = row.length; + + Feature[] rowScaled = new Feature[length]; + for (int i = 0; i < length; i++) { + int j = row[i].getFeatureIndex(); + + double norm = Primitives.doubleValue(colNorms.get(j), 0.d); + if (norm == 0.d) { // avoid zero-division + norm = 1.d; + } + double scaled = row[i].getValue() / Math.min(sqrtGamma, norm); + + rowScaled[i] = new IntFeature(j, scaled); + } + + final IntWritable jWritable = new IntWritable(); + final IntWritable kWritable = new IntWritable(); + final DoubleWritable bWritable = new DoubleWritable(); + + final Object[] forwardObjs = new Object[3]; + forwardObjs[0] = jWritable; + forwardObjs[1] = kWritable; + forwardObjs[2] = bWritable; + + for (int ij = 0; ij < length; ij++) { + int j = rowScaled[ij].getFeatureIndex(); + double jVal = rowScaled[ij].getValue(); + double jProb = Primitives.doubleValue(colProbs.get(j), 0.d); + + if (jVal != 0.d && rnd.nextDouble() < jProb) { + for (int ik = ij + 1; ik < length; ik++) { + int k = rowScaled[ik].getFeatureIndex(); + double kVal = rowScaled[ik].getValue(); + double kProb = Primitives.doubleValue(colProbs.get(k), 0.d); + + if (kVal != 0.d && rnd.nextDouble() < kProb) { + // compute b_jk + bWritable.set(jVal * kVal); + + if (symmetricOutput) { + // (j, k); similarity matrix is symmetric + jWritable.set(j); + kWritable.set(k); + forward(forwardObjs); + + // (k, j) + jWritable.set(k); + kWritable.set(j); + forward(forwardObjs); + } else { + if (j < k) { + jWritable.set(j); + kWritable.set(k); + } else { + jWritable.set(k); + kWritable.set(j); + } + forward(forwardObjs); + } + } + } + } + } + } + + private void forwardAsStringFeature(@Nonnull Feature[] row) throws HiveException { + final int length = row.length; + + Feature[] rowScaled = new Feature[length]; + for (int i = 0; i < length; i++) { + String j = row[i].getFeature(); + + double norm = Primitives.doubleValue(colNorms.get(j), 0.d); + if (norm == 0.d) { // avoid zero-division + norm = 1.d; + } + double scaled = row[i].getValue() / Math.min(sqrtGamma, norm); + + rowScaled[i] = new StringFeature(j, scaled); + } + + final Text jWritable = new Text(); + final Text kWritable = new Text(); + final DoubleWritable bWritable = new DoubleWritable(); + + final Object[] forwardObjs = new Object[3]; + forwardObjs[0] = jWritable; + forwardObjs[1] = kWritable; + forwardObjs[2] = bWritable; + + for (int ij = 0; ij < length; ij++) { + String j = rowScaled[ij].getFeature(); + double jVal = rowScaled[ij].getValue(); + double jProb = Primitives.doubleValue(colProbs.get(j), 0.d); + + if (jVal != 0.d && rnd.nextDouble() < jProb) { + for (int ik = ij + 1; ik < length; ik++) { + String k = rowScaled[ik].getFeature(); + double kVal = rowScaled[ik].getValue(); + double kProb = Primitives.doubleValue(colProbs.get(j), 0.d); + + if (kVal != 0.d && rnd.nextDouble() < kProb) { + // compute b_jk + bWritable.set(jVal * kVal); + + if (symmetricOutput) { + // (j, k); similarity matrix is symmetric + jWritable.set(j); + kWritable.set(k); + forward(forwardObjs); + + // (k, j) + jWritable.set(k); + kWritable.set(j); + forward(forwardObjs); + } else { + if (j.compareTo(k) < 0) { + jWritable.set(j); + kWritable.set(k); + } else { + jWritable.set(k); + kWritable.set(j); + } + forward(forwardObjs); + } + } + } + } + } + } + + @Nullable + protected Feature[] parseFeatures(@Nonnull final Object arg) throws HiveException { + return Feature.parseFeatures(arg, rowOI, probes, parseFeatureAsInt); + } + + @Override + public void close() throws HiveException { + this.probes = null; + this.colNorms = null; + this.colProbs = null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/tools/math/L2NormUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/math/L2NormUDAF.java b/core/src/main/java/hivemall/tools/math/L2NormUDAF.java new file mode 100644 index 0000000..921272a --- /dev/null +++ b/core/src/main/java/hivemall/tools/math/L2NormUDAF.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.math; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; + +@SuppressWarnings("deprecation") +@Description(name = "l2_norm", + value = "_FUNC_(double xi) - Return L2 norm of a vector which has the given values in each dimension") +public final class L2NormUDAF extends UDAF { + + public static class Evaluator implements UDAFEvaluator { + + private PartialResult partial; + + public Evaluator() {} + + @Override + public void init() { + this.partial = null; + } + + public boolean iterate(DoubleWritable xi) throws HiveException { + if (xi == null) {// skip + return true; + } + if (partial == null) { + this.partial = new PartialResult(); + } + partial.iterate(xi.get()); + return true; + } + + public PartialResult terminatePartial() { + return partial; + } + + public boolean merge(PartialResult other) throws HiveException { + if (other == null) { + return true; + } + if (partial == null) { + this.partial = new PartialResult(); + } + partial.merge(other); + return true; + } + + public double terminate() { + if (partial == null) { + return 0.d; + } + return partial.get(); + } + } + + public static class PartialResult { + + double squaredSum; + + PartialResult() { + this.squaredSum = 0.d; + } + + void iterate(double xi) { + squaredSum += xi * xi; + } + + void merge(PartialResult other) { + squaredSum += other.squaredSum; + } + + double get() { + return Math.sqrt(squaredSum); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java index 99b300d..6c1b0d1 100644 --- a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java +++ b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.lazy.LazyDouble; import org.apache.hadoop.hive.serde2.lazy.LazyInteger; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.LazyString; @@ -146,6 +147,24 @@ public final class HiveUtils { return Integer.parseInt(s); } + public static double asJavaDouble(@Nullable final Object o) { + if (o == null) { + throw new IllegalArgumentException(); + } + if (o instanceof Double) { + return ((Double) o).doubleValue(); + } + if (o instanceof LazyDouble) { + DoubleWritable d = ((LazyDouble) o).getWritableObject(); + return d.get(); + } + if (o instanceof DoubleWritable) { + return ((DoubleWritable) o).get(); + } + String s = o.toString(); + return Double.parseDouble(s); + } + @Nullable public static List<String> asStringList(@Nonnull final DeferredObject arg, @Nonnull final ListObjectInspector listOI) throws HiveException { http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/utils/lang/Primitives.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/utils/lang/Primitives.java b/core/src/main/java/hivemall/utils/lang/Primitives.java index 31cd8a8..2ec012c 100644 --- a/core/src/main/java/hivemall/utils/lang/Primitives.java +++ b/core/src/main/java/hivemall/utils/lang/Primitives.java @@ -76,6 +76,13 @@ public final class Primitives { return Boolean.parseBoolean(s); } + public static double doubleValue(final Double v, final double defaultValue) { + if (v == null) { + return defaultValue; + } + return v.doubleValue(); + } + public static int compare(final int x, final int y) { return (x < y) ? -1 : ((x == y) ? 0 : 1); } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java b/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java new file mode 100644 index 0000000..3bf90ec --- /dev/null +++ b/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.knn.similarity; + +import hivemall.mf.BPRMatrixFactorizationUDTFTest; +import hivemall.utils.hadoop.HiveUtils; +import hivemall.utils.lang.StringUtils; +import hivemall.utils.lang.mutable.MutableInt; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.Collector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.zip.GZIPInputStream; + +import javax.annotation.Nonnull; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DIMSUMMapperUDTFTest { + private static final boolean DEBUG = false; + + DIMSUMMapperUDTF udtf; + + double[][] R; + int numUsers, numItems; + + @Before + public void setUp() throws HiveException { + this.udtf = new DIMSUMMapperUDTF(); + + this.R = new double[][] { {1, 2, 3}, {1, 2, 3}}; + this.numUsers = R.length; + this.numItems = R[0].length; + } + + @Test + public void testIntFeature() throws HiveException { + final Map<Integer, Map<Integer, Double>> sims = new HashMap<Integer, Map<Integer, Double>>(); + Collector collector = new Collector() { + public void collect(Object input) throws HiveException { + Object[] row = (Object[]) input; + + Assert.assertTrue(row.length == 3); + + int j = HiveUtils.asJavaInt(row[0]); + int k = HiveUtils.asJavaInt(row[1]); + + Map<Integer, Double> sims_j = sims.get(j); + if (sims_j == null) { + sims_j = new HashMap<Integer, Double>(); + sims.put(j, sims_j); + } + Double sims_jk = sims_j.get(k); + if (sims_jk == null) { + sims_jk = 0.d; + } + sims_j.put(k, sims_jk + HiveUtils.asJavaDouble(row[2])); + } + }; + udtf.setCollector(collector); + + ObjectInspector[] argOIs = new ObjectInspector[] { + ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector), + ObjectInspectorFactory.getStandardMapObjectInspector( + PrimitiveObjectInspectorFactory.javaIntObjectInspector, + PrimitiveObjectInspectorFactory.javaDoubleObjectInspector), + ObjectInspectorUtils.getConstantObjectInspector( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + "-threshold 0 -disable_symmetric_output -int_feature")}; + // if threshold = 0, output is exact cosine similarity + + udtf.initialize(argOIs); + + final Integer[] itemIDs = new Integer[] {1, 2, 3}; + + final List<String> user1 = new ArrayList<String>(); + convertRowToFeatures(0, user1, itemIDs); + + final List<String> user2 = new ArrayList<String>(); + convertRowToFeatures(1, user2, itemIDs); + + final Map<Integer, Double> norms = new HashMap<Integer, Double>(); + computeColumnNorms(norms, itemIDs); + + udtf.process(new Object[] {user1, norms}); + udtf.process(new Object[] {user2, norms}); + + udtf.close(); + + int numSims = 0; + + for (Integer j : sims.keySet()) { + Map<Integer, Double> e = sims.get(j); + for (Integer k : e.keySet()) { + double s = e.get(k).doubleValue(); + println("(" + j + ", " + k + ") = " + s); + Assert.assertEquals(1.d, s, 1e-6); + numSims += 1; + } + } + + // Upper triangular: <1, 2>, <1, 3>, <2, 3> + Assert.assertTrue(numSims == 3); + } + + @Test + public void testStringFeature() throws HiveException { + final Map<String, Map<String, Double>> sims = new HashMap<String, Map<String, Double>>(); + Collector collector = new Collector() { + public void collect(Object input) throws HiveException { + Object[] row = (Object[]) input; + + Assert.assertTrue(row.length == 3); + + String j = row[0].toString(); + String k = row[1].toString(); + + Map<String, Double> sims_j = sims.get(j); + if (sims_j == null) { + sims_j = new HashMap<String, Double>(); + sims.put(j, sims_j); + } + Double sims_jk = sims_j.get(k); + if (sims_jk == null) { + sims_jk = 0.d; + } + sims_j.put(k, sims_jk + HiveUtils.asJavaDouble(row[2])); + } + }; + udtf.setCollector(collector); + + ObjectInspector[] argOIs = new ObjectInspector[] { + ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector), + ObjectInspectorFactory.getStandardMapObjectInspector( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaDoubleObjectInspector), + ObjectInspectorUtils.getConstantObjectInspector( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, "-threshold 0")}; + + udtf.initialize(argOIs); + + final String[] itemIDs = new String[] {"i1", "i2", "i3"}; + + final List<String> user1 = new ArrayList<String>(); + convertRowToFeatures(0, user1, itemIDs); + + final List<String> user2 = new ArrayList<String>(); + convertRowToFeatures(1, user2, itemIDs); + + final Map<String, Double> norms = new HashMap<String, Double>(); + computeColumnNorms(norms, itemIDs); + + udtf.process(new Object[] {user1, norms}); + udtf.process(new Object[] {user2, norms}); + + udtf.close(); + + int numSims = 0; + + for (String j : sims.keySet()) { + Map<String, Double> e = sims.get(j); + for (String k : e.keySet()) { + double s = e.get(k).doubleValue(); + println("(" + j + ", " + k + ") = " + s); + Assert.assertEquals(1.d, s, 1e-6); + numSims += 1; + } + } + + // Symmetric: <i1, i2>, <i2, i1>, <i1, i3>, <i3, i2>, <i2, i3>, <i3, i2> + Assert.assertTrue(numSims == 6); + } + + private void convertRowToFeatures(int i, @Nonnull List<String> dst, @Nonnull Object[] itemIDs) { + for (int j = 0; j < numItems; j++) { + double r = R[i][j]; + if (r != 0.d) { + dst.add(itemIDs[j] + ":" + r); + } + } + } + + private <T> void computeColumnNorms(@Nonnull Map<T, Double> dst, @Nonnull T[] itemIDs) { + for (int j = 0; j < numItems; j++) { + double norm = 0.d; + for (int i = 0; i < numUsers; i++) { + norm += R[i][j] * R[i][j]; + } + dst.put(itemIDs[j], Math.sqrt(norm)); + } + } + + @Test + public void testML100k() throws HiveException, IOException { + // rows of a user-item matrix; used by DIMSUM + final Map<String, List<String>> users = new HashMap<String, List<String>>(); + // columns of a user-item matrix; compute exact item-item cosine similarity + final Map<String, List<String>> items = new HashMap<String, List<String>>(); + + final Map<String, Double> norms = new HashMap<String, Double>(); + + BufferedReader buf = readFile("ml1k.train.gz"); + String line; + while ((line = buf.readLine()) != null) { + String[] cols = StringUtils.split(line, ' '); + + String userID = cols[0]; + String itemID = cols[1]; + double rate = Double.valueOf(cols[2]).doubleValue(); + + // find this user's list of ratings + List<String> userRatings = users.get(userID); + if (userRatings == null) { + userRatings = new ArrayList<String>(); + users.put(userID, userRatings); + } + // store observed item-rate pairs to the list + userRatings.add(itemID + ":" + rate); + + // find this item's list of ratings + List<String> itemRatings = items.get(itemID); + if (itemRatings == null) { + itemRatings = new ArrayList<String>(); + items.put(itemID, itemRatings); + } + // store observed user-rate pairs to the list + itemRatings.add(userID + ":" + rate); + + // accumulate to compute L2 norm of each column + Double norm = norms.get(itemID); + if (norm == null) { + norm = 0.d; + } + norm += rate * rate; + norms.put(itemID, norm); + } + + // compute L2 norm of each column + for (Map.Entry<String, Double> e : norms.entrySet()) { + norms.put(e.getKey(), Math.sqrt(e.getValue().doubleValue())); + } + + final MutableInt emitCounter = new MutableInt(0); + final Map<String, Map<String, Double>> sims = new HashMap<String, Map<String, Double>>(); + Collector collector = new Collector() { + public void collect(Object input) throws HiveException { + emitCounter.addValue(1); + + Object[] row = (Object[]) input; + + Assert.assertTrue(row.length == 3); + + String j = row[0].toString(); + String k = row[1].toString(); + + Map<String, Double> sims_j = sims.get(j); + if (sims_j == null) { + sims_j = new HashMap<String, Double>(); + sims.put(j, sims_j); + } + Double sims_jk = sims_j.get(k); + if (sims_jk == null) { + sims_jk = 0.d; + } + sims_j.put(k, sims_jk + HiveUtils.asJavaDouble(row[2])); + } + }; + udtf.setCollector(collector); + + // Case I: Set zero to `threshold` + // this computes exact cosine similarity + ObjectInspector[] argOIs = new ObjectInspector[] { + ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector), + ObjectInspectorFactory.getStandardMapObjectInspector( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaDoubleObjectInspector), + ObjectInspectorUtils.getConstantObjectInspector( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + "-threshold 0 -disable_symmetric_output")}; + + udtf.initialize(argOIs); + for (List<String> user : users.values()) { + udtf.process(new Object[] {user, norms}); + } + udtf.close(); + + int numMaxEmits = emitCounter.getValue(); + + // compare DIMSUM's exact similarity (i.e. threshold = 0) with exact column-wise cosine similarity + for (String j : items.keySet()) { + final Map<String, Double> sims_j = sims.get(j); + if (sims_j != null) { + final List<String> item_j = items.get(j); + for (String k : items.keySet()) { + final Double sims_jk = sims_j.get(k); + if (sims_jk != null) { + float simsExact_jk = CosineSimilarityUDF.cosineSimilarity(item_j, items.get(k)); + Assert.assertEquals(simsExact_jk, sims_jk.floatValue(), 1e-6); + } + } + } + } + + // reset counter and similarities + emitCounter.setValue(0); + sims.clear(); + + // Case II: Set (almost) max value to `threshold` + // this skips a bunch of operations with high probability + argOIs = new ObjectInspector[] { + ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector), + ObjectInspectorFactory.getStandardMapObjectInspector( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaDoubleObjectInspector), + ObjectInspectorUtils.getConstantObjectInspector( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + "-threshold 0.999999 -disable_symmetric_output")}; + + udtf.initialize(argOIs); + for (List<String> user : users.values()) { + udtf.process(new Object[] {user, norms}); + } + udtf.close(); + + Assert.assertTrue("Approximated one MUST reduce the number of operations", + emitCounter.getValue() < numMaxEmits); + } + + @Nonnull + private static BufferedReader readFile(@Nonnull String fileName) throws IOException { + // use MF's resource file + InputStream is = BPRMatrixFactorizationUDTFTest.class.getResourceAsStream(fileName); + if (fileName.endsWith(".gz")) { + is = new GZIPInputStream(is); + } + return new BufferedReader(new InputStreamReader(is)); + } + + private static void println(String msg) { + if (DEBUG) { + System.out.println(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java b/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java new file mode 100644 index 0000000..937bfcd --- /dev/null +++ b/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package hivemall.tools.math; + +import org.apache.hadoop.hive.serde2.io.DoubleWritable; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class L2NormUDAFTest { + + L2NormUDAF.Evaluator evaluator; + double[] x; + double expected; + + @Before + public void setUp() throws Exception { + this.evaluator = new L2NormUDAF.Evaluator(); + this.x = new double[] {1.d, 2.d, 3.d, 4.d, 5.d, 6.d}; + this.expected = 9.5393920141694561; + } + + @Test + public void test() throws Exception { + evaluator.init(); + + for (double xi : x) { + evaluator.iterate(new DoubleWritable(xi)); + } + + Assert.assertEquals(expected, evaluator.terminate(), 1e-5d); + } + + @Test + public void testMerge() throws Exception { + L2NormUDAF.PartialResult[] partials = new L2NormUDAF.PartialResult[3]; + + // bin #1 + evaluator.init(); + evaluator.iterate(new DoubleWritable(x[0])); + evaluator.iterate(new DoubleWritable(x[1])); + partials[0] = evaluator.terminatePartial(); + + // bin #2 + evaluator.init(); + evaluator.iterate(new DoubleWritable(x[2])); + evaluator.iterate(new DoubleWritable(x[3])); + partials[1] = evaluator.terminatePartial(); + + // bin #3 + evaluator.init(); + evaluator.iterate(new DoubleWritable(x[4])); + evaluator.iterate(new DoubleWritable(x[5])); + partials[2] = evaluator.terminatePartial(); + + // merge in a different order; e.g., <bin0, bin1>, <bin1, bin0> should return same value + final int[][] orders = new int[][] { {0, 1, 2}, {0, 2, 1}, {1, 0, 2}, {1, 2, 0}, {2, 1, 0}, + {2, 0, 1}}; + for (int i = 0; i < orders.length; i++) { + evaluator.init(); + + evaluator.merge(partials[orders[i][0]]); + evaluator.merge(partials[orders[i][1]]); + evaluator.merge(partials[orders[i][2]]); + + Assert.assertEquals(expected, evaluator.terminate(), 1e-5d); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/docs/gitbook/recommend/item_based_cf.md ---------------------------------------------------------------------- diff --git a/docs/gitbook/recommend/item_based_cf.md b/docs/gitbook/recommend/item_based_cf.md index a674f70..2b9097e 100644 --- a/docs/gitbook/recommend/item_based_cf.md +++ b/docs/gitbook/recommend/item_based_cf.md @@ -17,13 +17,16 @@ under the License. --> -This document describe how to do Item-based Collaborative Filtering using Hivemall. +This document describes how to do Item-based Collaborative Filtering using Hivemall. -_Caution: naive similarity computation is `O(n^2)` to compute all item-item pair similarity. [MinHash](https://en.wikipedia.org/wiki/MinHash#Jaccard_similarity_and_minimum_hash_values) is an efficient scheme for computing jaccard similarity. Section 6 show how to use MinHash in Hivemall._ +<!-- toc --> -## 1. Prepare transaction table +> #### Caution +> Naive similarity computation is `O(n^2)` to compute all item-item pair similarity. In order to accelerate the procedure, Hivemall has an efficient scheme for computing Jaccard and/or cosine similarity [as mentioned later](#efficient-similarity-computation). -Prepare following transaction table. We are generating `feature_vector` for each `item_id` based on cooccurrence of purchased items, a sort of bucket analysis. +# Prepare `transaction` table + +Prepare following `transaction` table. We will generate `feature_vector` for each `itemid` based on cooccurrence of purchased items, a sort of bucket analysis. | userid | itemid | purchase_at `timestamp` | |:-:|:-:|:-:| @@ -33,7 +36,7 @@ Prepare following transaction table. We are generating `feature_vector` for each | 3 | 2313 | 2016-06-04 19:29:02 | | .. | .. | .. | -## 2. Create item_features table +# Create `item_features` table What we want for creating a feature vector for each item is the following `cooccurrence` relation. @@ -49,7 +52,7 @@ What we want for creating a feature vector for each item is the following `coocc Feature vectors of each item will be as follows: | itemid | feature_vector `array<string>` | -|:-:|:-:| +|:-:|:-| | 583266 | 621056:9999, 583266:18 | | 31231 | 13212:129, 31231:3, 9833:953 | | ... | ... | @@ -57,16 +60,16 @@ Feature vectors of each item will be as follows: Note that value of feature vector should be scaled for k-NN similarity computation e.g., as follows: | itemid | feature_vector `array<string>` | -|:-:|:-:| +|:-:|:-| | 583266 | 621056:`ln(9999+1)`, 583266:`ln(18+1)` | | 31231 | 13212:`ln(129+1)`, 31231:`ln(3+1)`, 9833:`ln(953+1)` | | ... | ... | The following queries results in creating the above table. -### 2.1. Creating Item purchased table +## Step 1: Creating `user_purchased` table -The following query creates a table that contains userid, itemid, and purchased_at. The table represents the last user-item contact (purchase) while the `transaction` table holds all contacts. +The following query creates a table that contains `userid`, `itemid`, and `purchased_at`. The table represents the last user-item contact (purchase) while the `transaction` table holds all contacts. ```sql CREATE TABLE user_purchased as @@ -84,15 +87,15 @@ group by ; ``` -**Note:** _Better to avoid too old transactions because those information would be outdated though an enough number of transactions is required for recommendation._ - -### 2.2. Creating cooccurrence table +> #### Note +> Better to avoid too old transactions because those information would be outdated though an enough number of transactions is required for recommendation. -**Caution:** _Item-Item cooccurrence matrix is a symmetric matrix that has the number of total occurrence for each diagonal element . If the size of items are `k`, then the size of expected matrix is `k * (k - 1) / 2`, usually a very large one._ +## Step 2: Creating `cooccurrence` table -_Better to use [2.2.2.](#222-create-cooccurrence-table-from-upper-triangular-matrix-of-cooccurrence) instead of [2.2.1.](#221-create-cooccurrence-table-directly) for creating a `cooccurrence` table where dataset is large._ +> #### Caution +> Item-item cooccurrence matrix is a symmetric matrix that has the number of total occurrence for each diagonal element. If the size of items is `k`, then the size of expected matrix is `k * (k - 1) / 2`, usually a very large one. Hence, it is better to use [step 2-2](#step-2-2-create-cooccurrence-table-from-upper-triangular-matrix-of-cooccurrence) instead of [step 2-1](#step-2-1-create-cooccurrence-table-directly) for creating a `cooccurrence` table where dataset is large. -### 2.2.1. Create cooccurrence table directly +### Step 2-1: Create `cooccurrence` table directly ```sql create table cooccurrence as @@ -114,11 +117,15 @@ group by ; ``` -**Caution:** Note that specifying `having cnt >= 2` has a drawback that item cooccurrence is not calculated where `cnt` is less than 2. It could result no recommended items for certain items. Please ignore `having cnt >= 2` if the following computations finish in an acceptable/reasonable time. +> #### Note +> Note that specifying `having cnt >= 2` has a drawback that item cooccurrence is not calculated where `cnt` is less than 2. It could result no recommended items for certain items. Please ignore `having cnt >= 2` if the following computations finish in an acceptable/reasonable time. + +<br/> -**Caution:** _We ignore a purchase order in the following example. It means that the occurrence counts of `ItemA -> ItemB` and `ItemB -> ItemA` are assumed to be same. It is sometimes not a good idea e.g., for `Camera -> SD card` and `SD card -> Camera`._ +> #### Caution +> We ignore a purchase order in the following example. It means that the occurrence counts of `ItemA -> ItemB` and `ItemB -> ItemA` are assumed to be same. It is sometimes not a good idea in terms of reasoning; for example, `Camera -> SD card` and `SD card -> Camera` need to be considered separately. -### 2.2.2. Create cooccurrence table from Upper Triangular Matrix of cooccurrence +### Step 2-2: Create `cooccurrence` table from Upper Triangular Matrix of cooccurrence Better to create [Upper Triangular Matrix](https://en.wikipedia.org/wiki/Triangular_matrix#Description) that has `itemid > other` if resulting table is very large. No need to create Upper Triangular Matrix if your Hadoop cluster can handle the following instructions without considering it. @@ -149,9 +156,12 @@ select * from ( ) t; ``` -_Note: `UNION ALL` [required to be embedded](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union#LanguageManualUnion-UNIONwithinaFROMClause) in Hive._ +> #### Note +> `UNION ALL` [required to be embedded](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union#LanguageManualUnion-UNIONwithinaFROMClause) in Hive. -### Limiting size of elements in cooccurrence_upper_triangular +#### Limiting size of elements in `cooccurrence_upper_triangular` + +Using only top-N frequently co-occurred item pairs allows you to reduce the size of `cooccurrence` table: ```sql create table cooccurrence_upper_triangular as @@ -207,7 +217,7 @@ select itemid, other, cnt from t2; ``` -### 2.2.3. Computing cooccurrence ratio (optional step) +### Computing cooccurrence ratio (optional step) You can optionally compute cooccurrence ratio as follows: @@ -236,7 +246,7 @@ group by `l.cnt / r.totalcnt` represents a cooccurrence ratio of range `[0,1]`. -### 2.3. creating a feature vector for each item +## Step 3: Creating a feature vector for each item ```sql INSERT OVERWRITE TABLE item_features @@ -252,14 +262,14 @@ GROUP BY ; ``` -## 3. Computing Item similarity scores +# Compute item similarity scores -Item-Item similarity computation is known to be computation complexity `O(n^2)` where `n` is the number of items. -Depending on your cluster size and your dataset, the optimal solution differs. +Item-item similarity computation is known to be computational complexity `O(n^2)` where `n` is the number of items. We have two options to compute the similarities, and, depending on your cluster size and your dataset, the optimal solution differs. -**Note:** _Better to use [3.1.1.](#311-similarity-computation-using-the-symmetric-property-of-item-similarity-matrix) scheme where dataset is large._ +> #### Note +> If your dataset is large enough, better to choose [modified version of option 1](#taking-advantage-of-the-symmetric-property-of-item-similarity-matrix), which utilizes the symmetric property of similarity matrix. -### 3.1. Shuffle heavy similarity computation +## Option 1: Parallel computation with computationally heavy shuffling This version involves 3-way joins w/ large data shuffle; However, this version works in parallel where a cluster has enough task slots. @@ -293,9 +303,9 @@ from topk; ``` -### 3.1.1. Similarity computation using the symmetric property of Item similarity matrix +### Taking advantage of the symmetric property of item similarity matrix -Note `item_similarity` is a similarity matrix. So, you can compute it from an upper triangular matrix as follows. +Notice that `item_similarity` is a symmetric matrix. So, you can compute it from an upper triangular matrix as follows. ```sql WITH cooccurrence_top100 as ( @@ -348,9 +358,9 @@ select * from ( ) t; ``` -### 3.2. Computation heavy similarity computation +## Option 2: Sequential computation -Alternatively, you can compute cosine similarity as follows. This version involves cross join and thus runs sequentially in a single task. However, it involves less shuffle when compared to 3.1. +Alternatively, you can compute cosine similarity as follows. This version involves cross join and thus runs sequentially in a single task. However, it involves less shuffle compared to option 1. ```sql WITH similarity as ( @@ -393,13 +403,14 @@ from | 31231 | 9833 | 0.953 | | ... | ... | ... | -## 4. Item-based Recommendation +# Item-based recommendation This section introduces item-based recommendation based on recently purchased items by each user. -**Caution:** _It would better to ignore recommending some of items that user already purchased (only 1 time) while items that are purchased twice or more would be okey to be included in the recommendation list (e.g., repeatedly purchased daily necessities). So, you would need an item property table showing that each item is repeatedly purchased items or not._ +> #### Caution +> It would better to ignore recommending some of items that user already purchased (only 1 time) while items that are purchased twice or more would be okey to be included in the recommendation list (e.g., repeatedly purchased daily necessities). So, you would need an item property table showing that each item is repeatedly purchased items or not. -### 4.1. Computes top-k recently purchaed items for each user +## Step 1: Computes top-k recently purchased items for each user First, prepare `recently_purchased_items` table as follows: @@ -422,7 +433,11 @@ from ( ) t; ``` -### 4.2. Recommend top-k items based on the cooccurrence for each user's recently purchased item +## Step 2: Recommend top-k items based on users' recently purchased items + +In order to generate a list of recommended items, you can use either cooccurrence count or similarity as a relevance score. + +### Cooccurrence-based ```sql WITH topk as ( @@ -461,7 +476,7 @@ group by ; ``` -### 4.3. Recommend top-k items based on the (cooccurrence) similarity for each user's recently purchased item +### Similarity-based ```sql WITH topk as ( @@ -500,7 +515,11 @@ group by ; ``` -## 5. Pseudo Jaccard Similarity computation using MinHash +# Efficient similarity computation + +Since naive similarity computation takes `O(n^2)` computational complexity, utilizing a certain approximation scheme is practically important to improve efficiency and feasibility. In particular, Hivemall enables you to use one of two sophisticated approximation schemes, [MinHash](https://en.wikipedia.org/wiki/MinHash#Jaccard_similarity_and_minimum_hash_values) and [DIMSUM](https://blog.twitter.com/engineering/en_us/a/2014/all-pairs-similarity-via-dimsum.html). + +## MinHash: Compute "pseudo" Jaccard similarity Refer [this article](https://en.wikipedia.org/wiki/MinHash#Jaccard_similarity_and_minimum_hash_values ) to get details about MinHash and Jarccard similarity. [This blog article](https://blog.treasuredata.com/blog/2016/02/16/minhash-in-hivemall/) also explains about Hivemall's minhash. @@ -547,11 +566,13 @@ from top100 ; ``` -_Caution: Note that there might be no similar item for certain items._ -### 5.1. Cosine similarity computation following minhash-based similarity items filtering +> #### Note +> There might be no similar item for certain items. + +### Compute approximated cosine similarity by using the MinHash-based Jaccard similarity -You can compute `top-k` similar items based on cosine similarity, following rough `top-N` similar items listing using minhash, where `k << N` (e.g., k=10 and N=100). +Once the MinHash-based approach found rough `top-N` similar items, you can efficiently find `top-k` similar items in terms of cosine similarity, where `k << N` (e.g., k=10 and N=100). ```sql WITH similarity as ( @@ -581,4 +602,116 @@ select itemid, other, similarity from topk; +``` + +## DIMSUM: Approximated all-pairs similarity computation + +> #### Note +> This feature is supported from Hivemall v0.5-rc.1 or later. + +DIMSUM is a technique to efficiently and approximately compute similarities for all-pairs of items. You can refer to [an article in Twitter's Engineering blog](https://blog.twitter.com/engineering/en_us/a/2014/all-pairs-similarity-via-dimsum.html) to learn how DIMSUM reduces running time. + +Here, let us begin with the `user_purchased` table. `item_similarity` table can be obtained as follows: + +```sql +create table item_similarity as +with item_magnitude as ( -- compute magnitude of each item vector + select + to_map(j, mag) as mags + from ( + select + itemid as j, + l2_norm(ln(purchase_count+1)) as mag -- use scaled value + from + user_purchased + group by + itemid + ) t0 +), +item_features as ( + select + userid as i, + collect_list( + feature(itemid, ln(purchase_count+1)) -- use scaled value + ) as feature_vector + from + user_purchased + group by + userid +), +partial_result as ( -- launch DIMSUM in a MapReduce fashion + select + dimsum_mapper(f.feature_vector, m.mags, '-threshold 0.5') + as (itemid, other, s) + from + item_features f + left outer join item_magnitude m +), +similarity as ( -- reduce (i.e., sum up) mappers' partial results + select + itemid, + other, + sum(s) as similarity + from + partial_result + group by + itemid, other +), +topk as ( + select + each_top_k( -- get top-10 items based on similarity score + 10, itemid, similarity, + itemid, other -- output items + ) as (rank, similarity, itemid, other) + from ( + select * from similarity + CLUSTER BY itemid + ) t +) +-- insert overwrite table item_similarity +select + itemid, other, similarity +from + topk +; +``` + +Ultimately, using `item_similarity` for [item-based recommendation](#item-based-recommendation) is straightforward in a similar way to what we explained above. + +In the above query, an important part is obviously `dimsum_mapper(f.feature_vector, m.mags, '-threshold 0.5')`. An option `-threshold` is a real value in `[0, 1)` range, and intuitively it illustrates *"similarities above this threshold are approximated by the DIMSUM algorithm"*. + +### Create `item_similarity` from Upper Triangular Matrix + +Thanks to the symmetric property of similarity matrix, DIMSUM enables you to utilize space-efficient Upper-Triangular-Matrix-style output by just adding an option `-disable_symmetric_output`: + +```sql +create table item_similarity as +with item_magnitude as ( + ... +), +partial_result as ( + select + dimsum_mapper(f.feature_vector, m.mags, '-threshold 0.5 -disable_symmetric_output') + as (itemid, other, s) + from + item_features f + left outer join item_magnitude m +), +similarity_upper_triangular as ( -- if similarity of (i1, i2) pair is in this table, (i2, i1)'s similarity is omitted + select + itemid, + other, + sum(s) as similarity + from + partial_result + group by + itemid, other +), +similarity as ( -- copy (i1, i2)'s similarity as (i2, i1)'s one + select itemid, other, similarity from similarity_upper_triangular + union all + select other as itemid, itemid as other, similarity from similarity_upper_triangular +), +topk as ( + ... ``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-all-as-permanent.hive ---------------------------------------------------------------------- diff --git a/resources/ddl/define-all-as-permanent.hive b/resources/ddl/define-all-as-permanent.hive index 425d8ff..52f3fab 100644 --- a/resources/ddl/define-all-as-permanent.hive +++ b/resources/ddl/define-all-as-permanent.hive @@ -51,7 +51,7 @@ CREATE FUNCTION kpa_predict as 'hivemall.classifier.KPAPredictUDAF' USING JAR '$ -------------------------------- -- Multiclass classification -- --------------------------------- +-------------------------------- DROP FUNCTION IF EXISTS train_multiclass_perceptron; CREATE FUNCTION train_multiclass_perceptron as 'hivemall.classifier.multiclass.MulticlassPerceptronUDTF' USING JAR '${hivemall_jar}'; @@ -99,6 +99,9 @@ CREATE FUNCTION euclid_similarity as 'hivemall.knn.similarity.EuclidSimilarity' DROP FUNCTION IF EXISTS distance2similarity; CREATE FUNCTION distance2similarity as 'hivemall.knn.similarity.Distance2SimilarityUDF' USING JAR '${hivemall_jar}'; +DROP FUNCTION IF EXISTS dimsum_mapper; +CREATE FUNCTION dimsum_mapper as 'hivemall.knn.similarity.DIMSUMMapperUDTF' USING JAR '${hivemall_jar}'; + ------------------------ -- distance functions -- ------------------------ @@ -457,6 +460,9 @@ CREATE FUNCTION to_ordered_map as 'hivemall.tools.map.UDAFToOrderedMap' USING JA DROP FUNCTION IF EXISTS sigmoid; CREATE FUNCTION sigmoid as 'hivemall.tools.math.SigmoidGenericUDF' USING JAR '${hivemall_jar}'; +DROP FUNCTION IF EXISTS l2_norm; +CREATE FUNCTION l2_norm as 'hivemall.tools.math.L2NormUDAF' USING JAR '${hivemall_jar}'; + ---------------------- -- Matrix functions -- ---------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-all.hive ---------------------------------------------------------------------- diff --git a/resources/ddl/define-all.hive b/resources/ddl/define-all.hive index d283812..f752c19 100644 --- a/resources/ddl/define-all.hive +++ b/resources/ddl/define-all.hive @@ -47,7 +47,7 @@ create temporary function kpa_predict as 'hivemall.classifier.KPAPredictUDAF'; -------------------------------- -- Multiclass classification -- --------------------------------- +-------------------------------- drop temporary function if exists train_multiclass_perceptron; create temporary function train_multiclass_perceptron as 'hivemall.classifier.multiclass.MulticlassPerceptronUDTF'; @@ -95,6 +95,9 @@ create temporary function euclid_similarity as 'hivemall.knn.similarity.EuclidSi drop temporary function if exists distance2similarity; create temporary function distance2similarity as 'hivemall.knn.similarity.Distance2SimilarityUDF'; +drop temporary function if exists dimsum_mapper; +create temporary function dimsum_mapper as 'hivemall.knn.similarity.DIMSUMMapperUDTF'; + ------------------------ -- distance functions -- ------------------------ @@ -453,6 +456,9 @@ create temporary function to_ordered_map as 'hivemall.tools.map.UDAFToOrderedMap drop temporary function if exists sigmoid; create temporary function sigmoid as 'hivemall.tools.math.SigmoidGenericUDF'; +drop temporary function if exists l2_norm; +create temporary function l2_norm as 'hivemall.tools.math.L2NormUDAF'; + ---------------------- -- Matrix functions -- ---------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-all.spark ---------------------------------------------------------------------- diff --git a/resources/ddl/define-all.spark b/resources/ddl/define-all.spark index 1b90c9b..e4c4f1a 100644 --- a/resources/ddl/define-all.spark +++ b/resources/ddl/define-all.spark @@ -97,6 +97,9 @@ sqlContext.sql("CREATE TEMPORARY FUNCTION euclid_similarity AS 'hivemall.knn.sim sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS distance2similarity") sqlContext.sql("CREATE TEMPORARY FUNCTION distance2similarity AS 'hivemall.knn.similarity.Distance2SimilarityUDF'") +sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS dimsum_mapper") +sqlContext.sql("CREATE TEMPORARY FUNCTION dimsum_mapper AS 'hivemall.knn.similarity.DIMSUMMapperUDTF'") + /** * Distance functions */ @@ -452,6 +455,9 @@ sqlContext.sql("CREATE TEMPORARY FUNCTION to_ordered_map AS 'hivemall.tools.map. sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS sigmoid") sqlContext.sql("CREATE TEMPORARY FUNCTION sigmoid AS 'hivemall.tools.math.SigmoidGenericUDF'") +sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS l2_norm") +sqlContext.sql("CREATE TEMPORARY FUNCTION l2_norm AS 'hivemall.tools.math.L2NormUDAF'") + /** * Matrix functions */ http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-udfs.td.hql ---------------------------------------------------------------------- diff --git a/resources/ddl/define-udfs.td.hql b/resources/ddl/define-udfs.td.hql index e549649..547bf84 100644 --- a/resources/ddl/define-udfs.td.hql +++ b/resources/ddl/define-udfs.td.hql @@ -164,6 +164,8 @@ create temporary function train_plsa as 'hivemall.topicmodel.PLSAUDTF'; create temporary function plsa_predict as 'hivemall.topicmodel.PLSAPredictUDAF'; create temporary function tile as 'hivemall.geospatial.TileUDF'; create temporary function map_url as 'hivemall.geospatial.MapURLUDF'; +create temporary function l2_norm as 'hivemall.tools.math.L2NormUDAF'; +create temporary function dimsum_mapper as 'hivemall.knn.similarity.DIMSUMMapperUDTF'; -- NLP features create temporary function tokenize_ja as 'hivemall.nlp.tokenizer.KuromojiUDF';
