Repository: incubator-hivemall
Updated Branches:
  refs/heads/master 492b5d8e7 -> 1db535876


Close #84: [HIVEMALL-19] Support DIMSUM for approx all-pairs similarity


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/1fbf90a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/1fbf90a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/1fbf90a1

Branch: refs/heads/master
Commit: 1fbf90a1eca35cf38c1ae11bc641310f39fd9c8c
Parents: 492b5d8
Author: Takuya Kitazawa <[email protected]>
Authored: Wed Jun 7 16:07:05 2017 +0900
Committer: myui <[email protected]>
Committed: Wed Jun 7 16:07:05 2017 +0900

----------------------------------------------------------------------
 .../knn/similarity/DIMSUMMapperUDTF.java        | 353 +++++++++++++++++
 .../java/hivemall/tools/math/L2NormUDAF.java    |  99 +++++
 .../java/hivemall/utils/hadoop/HiveUtils.java   |  19 +
 .../java/hivemall/utils/lang/Primitives.java    |   7 +
 .../knn/similarity/DIMSUMMapperUDTFTest.java    | 374 +++++++++++++++++++
 .../hivemall/tools/math/L2NormUDAFTest.java     |  86 +++++
 docs/gitbook/recommend/item_based_cf.md         | 213 +++++++++--
 resources/ddl/define-all-as-permanent.hive      |   8 +-
 resources/ddl/define-all.hive                   |   8 +-
 resources/ddl/define-all.spark                  |   6 +
 resources/ddl/define-udfs.td.hql                |   2 +
 11 files changed, 1133 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java 
b/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java
new file mode 100644
index 0000000..73e218f
--- /dev/null
+++ b/core/src/main/java/hivemall/knn/similarity/DIMSUMMapperUDTF.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.knn.similarity;
+
+import hivemall.UDTFWithOptions;
+import hivemall.fm.Feature;
+import hivemall.fm.IntFeature;
+import hivemall.fm.StringFeature;
+import hivemall.math.random.PRNG;
+import hivemall.math.random.RandomNumberGeneratorFactory;
+import hivemall.utils.hadoop.HiveUtils;
+import hivemall.utils.lang.Primitives;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+@Description(
+        name = "dimsum_mapper",
+        value = "_FUNC_(array<string> row, map<int col_id, double norm> 
colNorms [, const string options]) "
+                + "- Returns column-wise partial similarities")
+public class DIMSUMMapperUDTF extends UDTFWithOptions {
+    private static final Log logger = 
LogFactory.getLog(DIMSUMMapperUDTF.class);
+
+    protected ListObjectInspector rowOI;
+    protected MapObjectInspector colNormsOI;
+
+    @Nullable
+    protected Feature[] probes;
+
+    @Nonnull
+    protected PRNG rnd;
+
+    protected double threshold;
+    protected double sqrtGamma;
+    protected boolean symmetricOutput;
+    protected boolean parseFeatureAsInt;
+
+    protected Map<Object, Double> colNorms;
+    protected Map<Object, Double> colProbs;
+
+    @Override
+    protected Options getOptions() {
+        Options opts = new Options();
+        opts.addOption("th", "threshold", true,
+            "Theoretically, similarities above this threshold are estimated 
[default: 0.5]");
+        opts.addOption("g", "gamma", true,
+            "Oversampling parameter; if `gamma` is given, `threshold` will be 
ignored"
+                + " [default: 10 * log(numCols) / threshold]");
+        opts.addOption("disable_symmetric", "disable_symmetric_output", false,
+            "Output only contains (col j, col k) pair; symmetric (col k, col 
j) pair is omitted");
+        opts.addOption("int_feature", "feature_as_integer", false,
+            "Parse a feature (i.e. column ID) as integer");
+        return opts;
+    }
+
+    @Override
+    protected CommandLine processOptions(@Nonnull ObjectInspector[] argOIs)
+            throws UDFArgumentException {
+        double threshold = 0.5d;
+        double gamma = Double.POSITIVE_INFINITY;
+        boolean symmetricOutput = true;
+        boolean parseFeatureAsInt = false;
+
+        CommandLine cl = null;
+        if (argOIs.length >= 3) {
+            String rawArgs = HiveUtils.getConstString(argOIs[2]);
+            cl = parseOptions(rawArgs);
+            threshold = Primitives.parseDouble(cl.getOptionValue("threshold"), 
threshold);
+            if (threshold < 0.f || threshold >= 1.f) {
+                throw new UDFArgumentException("`threshold` MUST be in range 
[0,1): " + threshold);
+            }
+            gamma = Primitives.parseDouble(cl.getOptionValue("gamma"), gamma);
+            if (gamma <= 1.d) {
+                throw new UDFArgumentException("`gamma` MUST be greater than 
1: " + gamma);
+            }
+            symmetricOutput = !cl.hasOption("disable_symmetric_output");
+            parseFeatureAsInt = cl.hasOption("feature_as_integer");
+        }
+
+        this.threshold = threshold;
+        this.sqrtGamma = Math.sqrt(gamma);
+        this.symmetricOutput = symmetricOutput;
+        this.parseFeatureAsInt = parseFeatureAsInt;
+
+        return cl;
+    }
+
+    @Override
+    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws 
UDFArgumentException {
+        if (argOIs.length != 2 && argOIs.length != 3) {
+            throw new UDFArgumentException(
+                getClass().getSimpleName()
+                        + " takes 2 or 3 arguments: array<string> x, map<long, 
double> colNorms "
+                        + "[, CONSTANT STRING options]: "
+                        + Arrays.toString(argOIs));
+        }
+
+        this.rowOI = HiveUtils.asListOI(argOIs[0]);
+        HiveUtils.validateFeatureOI(rowOI.getListElementObjectInspector());
+
+        this.colNormsOI = HiveUtils.asMapOI(argOIs[1]);
+
+        processOptions(argOIs);
+
+        this.rnd = RandomNumberGeneratorFactory.createPRNG(1001);
+        this.colNorms = null;
+        this.colProbs = null;
+
+        ArrayList<String> fieldNames = new ArrayList<String>();
+        fieldNames.add("j");
+        fieldNames.add("k");
+        fieldNames.add("b_jk");
+
+        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+        if (parseFeatureAsInt) {
+            
fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+            
fieldOIs.add(PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+        } else {
+            
fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
+            
fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
+        }
+        
fieldOIs.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+
+        return 
ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
+    }
+
+    @Override
+    public void process(Object[] args) throws HiveException {
+        Feature[] row = parseFeatures(args[0]);
+        if (row == null) {
+            return;
+        }
+        this.probes = row;
+
+        // since the 2nd argument (column norms) is consistent,
+        // column-related values, `colNorms` and `colProbs`, should be cached
+        if (colNorms == null || colProbs == null) {
+            final int numCols = colNormsOI.getMapSize(args[1]);
+
+            if (sqrtGamma == Double.POSITIVE_INFINITY) { // set default value 
to `gamma` based on `threshold`
+                if (threshold > 0.d) { // if `threshold` = 0, `gamma` is 
INFINITY i.e. always accept <j, k> pairs
+                    this.sqrtGamma = Math.sqrt(10 * Math.log(numCols) / 
threshold);
+                }
+            }
+
+            this.colNorms = new HashMap<Object, Double>(numCols);
+            this.colProbs = new HashMap<Object, Double>(numCols);
+            final Map<Object, Object> m = (Map<Object, Object>) 
colNormsOI.getMap(args[1]);
+            for (Map.Entry<Object, Object> e : m.entrySet()) {
+                Object j = e.getKey();
+                if (parseFeatureAsInt) {
+                    j = HiveUtils.asJavaInt(j);
+                } else {
+                    j = j.toString();
+                }
+
+                double norm = HiveUtils.asJavaDouble(e.getValue());
+                if (norm == 0.d) { // avoid zero-division
+                    norm = 1.d;
+                }
+
+                colNorms.put(j, norm);
+
+                double p = Math.min(1.d, sqrtGamma / norm);
+                colProbs.put(j, p);
+            }
+        }
+
+        if (parseFeatureAsInt) {
+            forwardAsIntFeature(row);
+        } else {
+            forwardAsStringFeature(row);
+        }
+    }
+
+    private void forwardAsIntFeature(@Nonnull Feature[] row) throws 
HiveException {
+        final int length = row.length;
+
+        Feature[] rowScaled = new Feature[length];
+        for (int i = 0; i < length; i++) {
+            int j = row[i].getFeatureIndex();
+
+            double norm = Primitives.doubleValue(colNorms.get(j), 0.d);
+            if (norm == 0.d) { // avoid zero-division
+                norm = 1.d;
+            }
+            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
+
+            rowScaled[i] = new IntFeature(j, scaled);
+        }
+
+        final IntWritable jWritable = new IntWritable();
+        final IntWritable kWritable = new IntWritable();
+        final DoubleWritable bWritable = new DoubleWritable();
+
+        final Object[] forwardObjs = new Object[3];
+        forwardObjs[0] = jWritable;
+        forwardObjs[1] = kWritable;
+        forwardObjs[2] = bWritable;
+
+        for (int ij = 0; ij < length; ij++) {
+            int j = rowScaled[ij].getFeatureIndex();
+            double jVal = rowScaled[ij].getValue();
+            double jProb = Primitives.doubleValue(colProbs.get(j), 0.d);
+
+            if (jVal != 0.d && rnd.nextDouble() < jProb) {
+                for (int ik = ij + 1; ik < length; ik++) {
+                    int k = rowScaled[ik].getFeatureIndex();
+                    double kVal = rowScaled[ik].getValue();
+                    double kProb = Primitives.doubleValue(colProbs.get(k), 
0.d);
+
+                    if (kVal != 0.d && rnd.nextDouble() < kProb) {
+                        // compute b_jk
+                        bWritable.set(jVal * kVal);
+
+                        if (symmetricOutput) {
+                            // (j, k); similarity matrix is symmetric
+                            jWritable.set(j);
+                            kWritable.set(k);
+                            forward(forwardObjs);
+
+                            // (k, j)
+                            jWritable.set(k);
+                            kWritable.set(j);
+                            forward(forwardObjs);
+                        } else {
+                            if (j < k) {
+                                jWritable.set(j);
+                                kWritable.set(k);
+                            } else {
+                                jWritable.set(k);
+                                kWritable.set(j);
+                            }
+                            forward(forwardObjs);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void forwardAsStringFeature(@Nonnull Feature[] row) throws 
HiveException {
+        final int length = row.length;
+
+        Feature[] rowScaled = new Feature[length];
+        for (int i = 0; i < length; i++) {
+            String j = row[i].getFeature();
+
+            double norm = Primitives.doubleValue(colNorms.get(j), 0.d);
+            if (norm == 0.d) { // avoid zero-division
+                norm = 1.d;
+            }
+            double scaled = row[i].getValue() / Math.min(sqrtGamma, norm);
+
+            rowScaled[i] = new StringFeature(j, scaled);
+        }
+
+        final Text jWritable = new Text();
+        final Text kWritable = new Text();
+        final DoubleWritable bWritable = new DoubleWritable();
+
+        final Object[] forwardObjs = new Object[3];
+        forwardObjs[0] = jWritable;
+        forwardObjs[1] = kWritable;
+        forwardObjs[2] = bWritable;
+
+        for (int ij = 0; ij < length; ij++) {
+            String j = rowScaled[ij].getFeature();
+            double jVal = rowScaled[ij].getValue();
+            double jProb = Primitives.doubleValue(colProbs.get(j), 0.d);
+
+            if (jVal != 0.d && rnd.nextDouble() < jProb) {
+                for (int ik = ij + 1; ik < length; ik++) {
+                    String k = rowScaled[ik].getFeature();
+                    double kVal = rowScaled[ik].getValue();
+                    double kProb = Primitives.doubleValue(colProbs.get(j), 
0.d);
+
+                    if (kVal != 0.d && rnd.nextDouble() < kProb) {
+                        // compute b_jk
+                        bWritable.set(jVal * kVal);
+
+                        if (symmetricOutput) {
+                            // (j, k); similarity matrix is symmetric
+                            jWritable.set(j);
+                            kWritable.set(k);
+                            forward(forwardObjs);
+
+                            // (k, j)
+                            jWritable.set(k);
+                            kWritable.set(j);
+                            forward(forwardObjs);
+                        } else {
+                            if (j.compareTo(k) < 0) {
+                                jWritable.set(j);
+                                kWritable.set(k);
+                            } else {
+                                jWritable.set(k);
+                                kWritable.set(j);
+                            }
+                            forward(forwardObjs);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    @Nullable
+    protected Feature[] parseFeatures(@Nonnull final Object arg) throws 
HiveException {
+        return Feature.parseFeatures(arg, rowOI, probes, parseFeatureAsInt);
+    }
+
+    @Override
+    public void close() throws HiveException {
+        this.probes = null;
+        this.colNorms = null;
+        this.colProbs = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/tools/math/L2NormUDAF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/tools/math/L2NormUDAF.java 
b/core/src/main/java/hivemall/tools/math/L2NormUDAF.java
new file mode 100644
index 0000000..921272a
--- /dev/null
+++ b/core/src/main/java/hivemall/tools/math/L2NormUDAF.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.math;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+
+@SuppressWarnings("deprecation")
+@Description(name = "l2_norm",
+        value = "_FUNC_(double xi) - Return L2 norm of a vector which has the 
given values in each dimension")
+public final class L2NormUDAF extends UDAF {
+
+    public static class Evaluator implements UDAFEvaluator {
+
+        private PartialResult partial;
+
+        public Evaluator() {}
+
+        @Override
+        public void init() {
+            this.partial = null;
+        }
+
+        public boolean iterate(DoubleWritable xi) throws HiveException {
+            if (xi == null) {// skip
+                return true;
+            }
+            if (partial == null) {
+                this.partial = new PartialResult();
+            }
+            partial.iterate(xi.get());
+            return true;
+        }
+
+        public PartialResult terminatePartial() {
+            return partial;
+        }
+
+        public boolean merge(PartialResult other) throws HiveException {
+            if (other == null) {
+                return true;
+            }
+            if (partial == null) {
+                this.partial = new PartialResult();
+            }
+            partial.merge(other);
+            return true;
+        }
+
+        public double terminate() {
+            if (partial == null) {
+                return 0.d;
+            }
+            return partial.get();
+        }
+    }
+
+    public static class PartialResult {
+
+        double squaredSum;
+
+        PartialResult() {
+            this.squaredSum = 0.d;
+        }
+
+        void iterate(double xi) {
+            squaredSum += xi * xi;
+        }
+
+        void merge(PartialResult other) {
+            squaredSum += other.squaredSum;
+        }
+
+        double get() {
+            return Math.sqrt(squaredSum);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java 
b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
index 99b300d..6c1b0d1 100644
--- a/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
+++ b/core/src/main/java/hivemall/utils/hadoop/HiveUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazyDouble;
 import org.apache.hadoop.hive.serde2.lazy.LazyInteger;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazyString;
@@ -146,6 +147,24 @@ public final class HiveUtils {
         return Integer.parseInt(s);
     }
 
+    public static double asJavaDouble(@Nullable final Object o) {
+        if (o == null) {
+            throw new IllegalArgumentException();
+        }
+        if (o instanceof Double) {
+            return ((Double) o).doubleValue();
+        }
+        if (o instanceof LazyDouble) {
+            DoubleWritable d = ((LazyDouble) o).getWritableObject();
+            return d.get();
+        }
+        if (o instanceof DoubleWritable) {
+            return ((DoubleWritable) o).get();
+        }
+        String s = o.toString();
+        return Double.parseDouble(s);
+    }
+
     @Nullable
     public static List<String> asStringList(@Nonnull final DeferredObject arg,
             @Nonnull final ListObjectInspector listOI) throws HiveException {

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/main/java/hivemall/utils/lang/Primitives.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/hivemall/utils/lang/Primitives.java 
b/core/src/main/java/hivemall/utils/lang/Primitives.java
index 31cd8a8..2ec012c 100644
--- a/core/src/main/java/hivemall/utils/lang/Primitives.java
+++ b/core/src/main/java/hivemall/utils/lang/Primitives.java
@@ -76,6 +76,13 @@ public final class Primitives {
         return Boolean.parseBoolean(s);
     }
 
+    public static double doubleValue(final Double v, final double 
defaultValue) {
+        if (v == null) {
+            return defaultValue;
+        }
+        return v.doubleValue();
+    }
+
     public static int compare(final int x, final int y) {
         return (x < y) ? -1 : ((x == y) ? 0 : 1);
     }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java 
b/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java
new file mode 100644
index 0000000..3bf90ec
--- /dev/null
+++ b/core/src/test/java/hivemall/knn/similarity/DIMSUMMapperUDTFTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.knn.similarity;
+
+import hivemall.mf.BPRMatrixFactorizationUDTFTest;
+import hivemall.utils.hadoop.HiveUtils;
+import hivemall.utils.lang.StringUtils;
+import hivemall.utils.lang.mutable.MutableInt;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+import javax.annotation.Nonnull;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DIMSUMMapperUDTFTest {
+    private static final boolean DEBUG = false;
+
+    DIMSUMMapperUDTF udtf;
+
+    double[][] R;
+    int numUsers, numItems;
+
+    @Before
+    public void setUp() throws HiveException {
+        this.udtf = new DIMSUMMapperUDTF();
+
+        this.R = new double[][] { {1, 2, 3}, {1, 2, 3}};
+        this.numUsers = R.length;
+        this.numItems = R[0].length;
+    }
+
+    @Test
+    public void testIntFeature() throws HiveException {
+        final Map<Integer, Map<Integer, Double>> sims = new HashMap<Integer, 
Map<Integer, Double>>();
+        Collector collector = new Collector() {
+            public void collect(Object input) throws HiveException {
+                Object[] row = (Object[]) input;
+
+                Assert.assertTrue(row.length == 3);
+
+                int j = HiveUtils.asJavaInt(row[0]);
+                int k = HiveUtils.asJavaInt(row[1]);
+
+                Map<Integer, Double> sims_j = sims.get(j);
+                if (sims_j == null) {
+                    sims_j = new HashMap<Integer, Double>();
+                    sims.put(j, sims_j);
+                }
+                Double sims_jk = sims_j.get(k);
+                if (sims_jk == null) {
+                    sims_jk = 0.d;
+                }
+                sims_j.put(k, sims_jk + HiveUtils.asJavaDouble(row[2]));
+            }
+        };
+        udtf.setCollector(collector);
+
+        ObjectInspector[] argOIs = new ObjectInspector[] {
+                
ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector),
+                ObjectInspectorFactory.getStandardMapObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaIntObjectInspector,
+                    PrimitiveObjectInspectorFactory.javaDoubleObjectInspector),
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                    "-threshold 0 -disable_symmetric_output -int_feature")};
+        // if threshold = 0, output is exact cosine similarity
+
+        udtf.initialize(argOIs);
+
+        final Integer[] itemIDs = new Integer[] {1, 2, 3};
+
+        final List<String> user1 = new ArrayList<String>();
+        convertRowToFeatures(0, user1, itemIDs);
+
+        final List<String> user2 = new ArrayList<String>();
+        convertRowToFeatures(1, user2, itemIDs);
+
+        final Map<Integer, Double> norms = new HashMap<Integer, Double>();
+        computeColumnNorms(norms, itemIDs);
+
+        udtf.process(new Object[] {user1, norms});
+        udtf.process(new Object[] {user2, norms});
+
+        udtf.close();
+
+        int numSims = 0;
+
+        for (Integer j : sims.keySet()) {
+            Map<Integer, Double> e = sims.get(j);
+            for (Integer k : e.keySet()) {
+                double s = e.get(k).doubleValue();
+                println("(" + j + ", " + k + ") = " + s);
+                Assert.assertEquals(1.d, s, 1e-6);
+                numSims += 1;
+            }
+        }
+
+        // Upper triangular: <1, 2>, <1, 3>, <2, 3>
+        Assert.assertTrue(numSims == 3);
+    }
+
+    @Test
+    public void testStringFeature() throws HiveException {
+        final Map<String, Map<String, Double>> sims = new HashMap<String, 
Map<String, Double>>();
+        Collector collector = new Collector() {
+            public void collect(Object input) throws HiveException {
+                Object[] row = (Object[]) input;
+
+                Assert.assertTrue(row.length == 3);
+
+                String j = row[0].toString();
+                String k = row[1].toString();
+
+                Map<String, Double> sims_j = sims.get(j);
+                if (sims_j == null) {
+                    sims_j = new HashMap<String, Double>();
+                    sims.put(j, sims_j);
+                }
+                Double sims_jk = sims_j.get(k);
+                if (sims_jk == null) {
+                    sims_jk = 0.d;
+                }
+                sims_j.put(k, sims_jk + HiveUtils.asJavaDouble(row[2]));
+            }
+        };
+        udtf.setCollector(collector);
+
+        ObjectInspector[] argOIs = new ObjectInspector[] {
+                
ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector),
+                ObjectInspectorFactory.getStandardMapObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                    PrimitiveObjectInspectorFactory.javaDoubleObjectInspector),
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector, 
"-threshold 0")};
+
+        udtf.initialize(argOIs);
+
+        final String[] itemIDs = new String[] {"i1", "i2", "i3"};
+
+        final List<String> user1 = new ArrayList<String>();
+        convertRowToFeatures(0, user1, itemIDs);
+
+        final List<String> user2 = new ArrayList<String>();
+        convertRowToFeatures(1, user2, itemIDs);
+
+        final Map<String, Double> norms = new HashMap<String, Double>();
+        computeColumnNorms(norms, itemIDs);
+
+        udtf.process(new Object[] {user1, norms});
+        udtf.process(new Object[] {user2, norms});
+
+        udtf.close();
+
+        int numSims = 0;
+
+        for (String j : sims.keySet()) {
+            Map<String, Double> e = sims.get(j);
+            for (String k : e.keySet()) {
+                double s = e.get(k).doubleValue();
+                println("(" + j + ", " + k + ") = " + s);
+                Assert.assertEquals(1.d, s, 1e-6);
+                numSims += 1;
+            }
+        }
+
+        // Symmetric: <i1, i2>, <i2, i1>, <i1, i3>, <i3, i2>, <i2, i3>, <i3, 
i2>
+        Assert.assertTrue(numSims == 6);
+    }
+
+    private void convertRowToFeatures(int i, @Nonnull List<String> dst, 
@Nonnull Object[] itemIDs) {
+        for (int j = 0; j < numItems; j++) {
+            double r = R[i][j];
+            if (r != 0.d) {
+                dst.add(itemIDs[j] + ":" + r);
+            }
+        }
+    }
+
+    private <T> void computeColumnNorms(@Nonnull Map<T, Double> dst, @Nonnull 
T[] itemIDs) {
+        for (int j = 0; j < numItems; j++) {
+            double norm = 0.d;
+            for (int i = 0; i < numUsers; i++) {
+                norm += R[i][j] * R[i][j];
+            }
+            dst.put(itemIDs[j], Math.sqrt(norm));
+        }
+    }
+
+    @Test
+    public void testML100k() throws HiveException, IOException {
+        // rows of a user-item matrix; used by DIMSUM
+        final Map<String, List<String>> users = new HashMap<String, 
List<String>>();
+        // columns of a user-item matrix; compute exact item-item cosine 
similarity
+        final Map<String, List<String>> items = new HashMap<String, 
List<String>>();
+
+        final Map<String, Double> norms = new HashMap<String, Double>();
+
+        BufferedReader buf = readFile("ml1k.train.gz");
+        String line;
+        while ((line = buf.readLine()) != null) {
+            String[] cols = StringUtils.split(line, ' ');
+
+            String userID = cols[0];
+            String itemID = cols[1];
+            double rate = Double.valueOf(cols[2]).doubleValue();
+
+            // find this user's list of ratings
+            List<String> userRatings = users.get(userID);
+            if (userRatings == null) {
+                userRatings = new ArrayList<String>();
+                users.put(userID, userRatings);
+            }
+            // store observed item-rate pairs to the list
+            userRatings.add(itemID + ":" + rate);
+
+            // find this item's list of ratings
+            List<String> itemRatings = items.get(itemID);
+            if (itemRatings == null) {
+                itemRatings = new ArrayList<String>();
+                items.put(itemID, itemRatings);
+            }
+            // store observed user-rate pairs to the list
+            itemRatings.add(userID + ":" + rate);
+
+            // accumulate to compute L2 norm of each column
+            Double norm = norms.get(itemID);
+            if (norm == null) {
+                norm = 0.d;
+            }
+            norm += rate * rate;
+            norms.put(itemID, norm);
+        }
+
+        // compute L2 norm of each column
+        for (Map.Entry<String, Double> e : norms.entrySet()) {
+            norms.put(e.getKey(), Math.sqrt(e.getValue().doubleValue()));
+        }
+
+        final MutableInt emitCounter = new MutableInt(0);
+        final Map<String, Map<String, Double>> sims = new HashMap<String, 
Map<String, Double>>();
+        Collector collector = new Collector() {
+            public void collect(Object input) throws HiveException {
+                emitCounter.addValue(1);
+
+                Object[] row = (Object[]) input;
+
+                Assert.assertTrue(row.length == 3);
+
+                String j = row[0].toString();
+                String k = row[1].toString();
+
+                Map<String, Double> sims_j = sims.get(j);
+                if (sims_j == null) {
+                    sims_j = new HashMap<String, Double>();
+                    sims.put(j, sims_j);
+                }
+                Double sims_jk = sims_j.get(k);
+                if (sims_jk == null) {
+                    sims_jk = 0.d;
+                }
+                sims_j.put(k, sims_jk + HiveUtils.asJavaDouble(row[2]));
+            }
+        };
+        udtf.setCollector(collector);
+
+        // Case I: Set zero to `threshold`
+        // this computes exact cosine similarity
+        ObjectInspector[] argOIs = new ObjectInspector[] {
+                
ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector),
+                ObjectInspectorFactory.getStandardMapObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                    PrimitiveObjectInspectorFactory.javaDoubleObjectInspector),
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                    "-threshold 0 -disable_symmetric_output")};
+
+        udtf.initialize(argOIs);
+        for (List<String> user : users.values()) {
+            udtf.process(new Object[] {user, norms});
+        }
+        udtf.close();
+
+        int numMaxEmits = emitCounter.getValue();
+
+        // compare DIMSUM's exact similarity (i.e. threshold = 0) with exact 
column-wise cosine similarity
+        for (String j : items.keySet()) {
+            final Map<String, Double> sims_j = sims.get(j);
+            if (sims_j != null) {
+                final List<String> item_j = items.get(j);
+                for (String k : items.keySet()) {
+                    final Double sims_jk = sims_j.get(k);
+                    if (sims_jk != null) {
+                        float simsExact_jk = 
CosineSimilarityUDF.cosineSimilarity(item_j, items.get(k));
+                        Assert.assertEquals(simsExact_jk, 
sims_jk.floatValue(), 1e-6);
+                    }
+                }
+            }
+        }
+
+        // reset counter and similarities
+        emitCounter.setValue(0);
+        sims.clear();
+
+        // Case II: Set (almost) max value to `threshold`
+        // this skips a bunch of operations with high probability
+        argOIs = new ObjectInspector[] {
+                
ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector),
+                ObjectInspectorFactory.getStandardMapObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                    PrimitiveObjectInspectorFactory.javaDoubleObjectInspector),
+                ObjectInspectorUtils.getConstantObjectInspector(
+                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                    "-threshold 0.999999 -disable_symmetric_output")};
+
+        udtf.initialize(argOIs);
+        for (List<String> user : users.values()) {
+            udtf.process(new Object[] {user, norms});
+        }
+        udtf.close();
+
+        Assert.assertTrue("Approximated one MUST reduce the number of 
operations",
+            emitCounter.getValue() < numMaxEmits);
+    }
+
+    @Nonnull
+    private static BufferedReader readFile(@Nonnull String fileName) throws 
IOException {
+        // use MF's resource file
+        InputStream is = 
BPRMatrixFactorizationUDTFTest.class.getResourceAsStream(fileName);
+        if (fileName.endsWith(".gz")) {
+            is = new GZIPInputStream(is);
+        }
+        return new BufferedReader(new InputStreamReader(is));
+    }
+
+    private static void println(String msg) {
+        if (DEBUG) {
+            System.out.println(msg);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java 
b/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java
new file mode 100644
index 0000000..937bfcd
--- /dev/null
+++ b/core/src/test/java/hivemall/tools/math/L2NormUDAFTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package hivemall.tools.math;
+
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class L2NormUDAFTest {
+
+    L2NormUDAF.Evaluator evaluator;
+    double[] x;
+    double expected;
+
+    @Before
+    public void setUp() throws Exception {
+        this.evaluator = new L2NormUDAF.Evaluator();
+        this.x = new double[] {1.d, 2.d, 3.d, 4.d, 5.d, 6.d};
+        this.expected = 9.5393920141694561;
+    }
+
+    @Test
+    public void test() throws Exception {
+        evaluator.init();
+
+        for (double xi : x) {
+            evaluator.iterate(new DoubleWritable(xi));
+        }
+
+        Assert.assertEquals(expected, evaluator.terminate(), 1e-5d);
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        L2NormUDAF.PartialResult[] partials = new L2NormUDAF.PartialResult[3];
+
+        // bin #1
+        evaluator.init();
+        evaluator.iterate(new DoubleWritable(x[0]));
+        evaluator.iterate(new DoubleWritable(x[1]));
+        partials[0] = evaluator.terminatePartial();
+
+        // bin #2
+        evaluator.init();
+        evaluator.iterate(new DoubleWritable(x[2]));
+        evaluator.iterate(new DoubleWritable(x[3]));
+        partials[1] = evaluator.terminatePartial();
+
+        // bin #3
+        evaluator.init();
+        evaluator.iterate(new DoubleWritable(x[4]));
+        evaluator.iterate(new DoubleWritable(x[5]));
+        partials[2] = evaluator.terminatePartial();
+
+        // merge in a different order; e.g., <bin0, bin1>, <bin1, bin0> should 
return same value
+        final int[][] orders = new int[][] { {0, 1, 2}, {0, 2, 1}, {1, 0, 2}, 
{1, 2, 0}, {2, 1, 0},
+                {2, 0, 1}};
+        for (int i = 0; i < orders.length; i++) {
+            evaluator.init();
+
+            evaluator.merge(partials[orders[i][0]]);
+            evaluator.merge(partials[orders[i][1]]);
+            evaluator.merge(partials[orders[i][2]]);
+
+            Assert.assertEquals(expected, evaluator.terminate(), 1e-5d);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/docs/gitbook/recommend/item_based_cf.md
----------------------------------------------------------------------
diff --git a/docs/gitbook/recommend/item_based_cf.md 
b/docs/gitbook/recommend/item_based_cf.md
index a674f70..2b9097e 100644
--- a/docs/gitbook/recommend/item_based_cf.md
+++ b/docs/gitbook/recommend/item_based_cf.md
@@ -17,13 +17,16 @@
   under the License.
 -->
         
-This document describe how to do Item-based Collaborative Filtering using 
Hivemall.
+This document describes how to do Item-based Collaborative Filtering using 
Hivemall.
 
-_Caution: naive similarity computation is `O(n^2)` to compute all item-item 
pair similarity. 
[MinHash](https://en.wikipedia.org/wiki/MinHash#Jaccard_similarity_and_minimum_hash_values)
 is an efficient scheme for computing jaccard similarity. Section 6 show how to 
use MinHash in Hivemall._
+<!-- toc -->
 
-## 1. Prepare transaction table
+> #### Caution
+> Naive similarity computation is `O(n^2)` to compute all item-item pair 
similarity. In order to accelerate the procedure, Hivemall has an efficient 
scheme for computing Jaccard and/or cosine similarity [as mentioned 
later](#efficient-similarity-computation).
 
-Prepare following transaction table. We are generating `feature_vector` for 
each `item_id` based on cooccurrence of purchased items, a sort of bucket 
analysis.
+# Prepare `transaction` table
+
+Prepare following `transaction` table. We will generate `feature_vector` for 
each `itemid` based on cooccurrence of purchased items, a sort of bucket 
analysis.
 
 | userid | itemid | purchase_at `timestamp` |
 |:-:|:-:|:-:| 
@@ -33,7 +36,7 @@ Prepare following transaction table. We are generating 
`feature_vector` for each
 | 3 | 2313 | 2016-06-04 19:29:02 |
 | .. | .. | .. |
 
-## 2. Create item_features table
+# Create `item_features` table
 
 What we want for creating a feature vector for each item is the following 
`cooccurrence` relation.
 
@@ -49,7 +52,7 @@ What we want for creating a feature vector for each item is 
the following `coocc
 Feature vectors of each item will be as follows:
 
 | itemid | feature_vector `array<string>` |
-|:-:|:-:|
+|:-:|:-|
 | 583266 | 621056:9999, 583266:18 |
 | 31231 | 13212:129, 31231:3, 9833:953 |
 | ... | ... |
@@ -57,16 +60,16 @@ Feature vectors of each item will be as follows:
 Note that value of feature vector should be scaled for k-NN similarity 
computation e.g., as follows:
 
 | itemid | feature_vector `array<string>` |
-|:-:|:-:|
+|:-:|:-|
 | 583266 | 621056:`ln(9999+1)`, 583266:`ln(18+1)` |
 | 31231 | 13212:`ln(129+1)`, 31231:`ln(3+1)`, 9833:`ln(953+1)` |
 | ... | ... |
 
 The following queries results in creating the above table.
 
-### 2.1. Creating Item purchased table
+## Step 1: Creating `user_purchased` table
 
-The following query creates a table that contains userid, itemid, and 
purchased_at. The table represents the last user-item contact (purchase) while 
the `transaction` table holds all contacts.
+The following query creates a table that contains `userid`, `itemid`, and 
`purchased_at`. The table represents the last user-item contact (purchase) 
while the `transaction` table holds all contacts.
 
 ```sql
 CREATE TABLE user_purchased as
@@ -84,15 +87,15 @@ group by
 ;
 ```
 
-**Note:** _Better to avoid too old transactions because those information 
would be outdated though an enough number of transactions is required for 
recommendation._
-
-### 2.2. Creating cooccurrence table
+> #### Note
+> Better to avoid too old transactions because those information would be 
outdated though an enough number of transactions is required for recommendation.
 
-**Caution:** _Item-Item cooccurrence matrix is a symmetric matrix that has the 
number of total occurrence for each diagonal element . If the size of items are 
`k`, then the size of expected matrix is `k * (k - 1) / 2`, usually a very 
large one._
+## Step 2: Creating `cooccurrence` table
 
-_Better to use 
[2.2.2.](#222-create-cooccurrence-table-from-upper-triangular-matrix-of-cooccurrence)
 instead of [2.2.1.](#221-create-cooccurrence-table-directly) for creating a 
`cooccurrence` table where dataset is large._
+> #### Caution
+> Item-item cooccurrence matrix is a symmetric matrix that has the number of 
total occurrence for each diagonal element. If the size of items is `k`, then 
the size of expected matrix is `k * (k - 1) / 2`, usually a very large one. 
Hence, it is better to use [step 
2-2](#step-2-2-create-cooccurrence-table-from-upper-triangular-matrix-of-cooccurrence)
 instead of [step 2-1](#step-2-1-create-cooccurrence-table-directly) for 
creating a `cooccurrence` table where dataset is large.
 
-### 2.2.1. Create cooccurrence table directly
+### Step 2-1: Create `cooccurrence` table directly
 
 ```sql
 create table cooccurrence as 
@@ -114,11 +117,15 @@ group by
 ;
 ```
 
-**Caution:** Note that specifying `having cnt >= 2` has a drawback that item 
cooccurrence is not calculated where `cnt` is less than 2. It could result no 
recommended items for certain items. Please ignore `having cnt >= 2` if the 
following computations finish in an acceptable/reasonable time.
+> #### Note 
+> Note that specifying `having cnt >= 2` has a drawback that item cooccurrence 
is not calculated where `cnt` is less than 2. It could result no recommended 
items for certain items. Please ignore `having cnt >= 2` if the following 
computations finish in an acceptable/reasonable time.
+
+<br/>
 
-**Caution:** _We ignore a purchase order in the following example. It means 
that the occurrence counts of `ItemA -> ItemB` and `ItemB -> ItemA` are assumed 
to be same. It is sometimes not a good idea e.g., for `Camera -> SD card` and 
`SD card -> Camera`._
+> #### Caution
+> We ignore a purchase order in the following example. It means that the 
occurrence counts of `ItemA -> ItemB` and `ItemB -> ItemA` are assumed to be 
same. It is sometimes not a good idea in terms of reasoning; for example, 
`Camera -> SD card` and `SD card -> Camera` need to be considered separately.
 
-### 2.2.2. Create cooccurrence table from Upper Triangular Matrix of 
cooccurrence
+### Step 2-2: Create `cooccurrence` table from Upper Triangular Matrix of 
cooccurrence
 
 Better to create [Upper Triangular 
Matrix](https://en.wikipedia.org/wiki/Triangular_matrix#Description) that has 
`itemid > other` if resulting table is very large. No need to create Upper 
Triangular Matrix if your Hadoop cluster can handle the following instructions 
without considering it.
 
@@ -149,9 +156,12 @@ select * from (
 ) t; 
 ```
 
-_Note: `UNION ALL` [required to be 
embedded](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union#LanguageManualUnion-UNIONwithinaFROMClause)
 in Hive._
+> #### Note
+> `UNION ALL` [required to be 
embedded](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union#LanguageManualUnion-UNIONwithinaFROMClause)
 in Hive.
 
-### Limiting size of elements in cooccurrence_upper_triangular
+#### Limiting size of elements in `cooccurrence_upper_triangular`
+
+Using only top-N frequently co-occurred item pairs allows you to reduce the 
size of `cooccurrence` table:
 
 ```sql
 create table cooccurrence_upper_triangular as
@@ -207,7 +217,7 @@ select itemid, other, cnt
 from t2;
 ```
 
-### 2.2.3. Computing cooccurrence ratio (optional step)
+### Computing cooccurrence ratio (optional step)
 
 You can optionally compute cooccurrence ratio as follows:
 
@@ -236,7 +246,7 @@ group by
 
 `l.cnt / r.totalcnt` represents a cooccurrence ratio of range `[0,1]`.
 
-### 2.3. creating a feature vector for each item
+## Step 3: Creating a feature vector for each item
 
 ```sql
 INSERT OVERWRITE TABLE item_features
@@ -252,14 +262,14 @@ GROUP BY
 ;
 ```
 
-## 3. Computing Item similarity scores
+# Compute item similarity scores
 
-Item-Item similarity computation is known to be computation complexity 
`O(n^2)` where `n` is the number of items.
-Depending on your cluster size and your dataset, the optimal solution differs.
+Item-item similarity computation is known to be computational complexity 
`O(n^2)` where `n` is the number of items. We have two options to compute the 
similarities, and, depending on your cluster size and your dataset, the optimal 
solution differs. 
 
-**Note:** _Better to use 
[3.1.1.](#311-similarity-computation-using-the-symmetric-property-of-item-similarity-matrix)
 scheme where dataset is large._
+> #### Note 
+> If your dataset is large enough, better to choose [modified version of 
option 
1](#taking-advantage-of-the-symmetric-property-of-item-similarity-matrix), 
which utilizes the symmetric property of similarity matrix.
 
-### 3.1. Shuffle heavy similarity computation
+## Option 1: Parallel computation with computationally heavy shuffling
 
 This version involves 3-way joins w/ large data shuffle; However, this version 
works in parallel where a cluster has enough task slots.
 
@@ -293,9 +303,9 @@ from
   topk;
 ```
 
-### 3.1.1. Similarity computation using the symmetric property of Item 
similarity matrix
+### Taking advantage of the symmetric property of item similarity matrix
 
-Note `item_similarity` is a similarity matrix. So, you can compute it from an 
upper triangular matrix as follows.
+Notice that `item_similarity` is a symmetric matrix. So, you can compute it 
from an upper triangular matrix as follows.
 
 ```sql
 WITH cooccurrence_top100 as (
@@ -348,9 +358,9 @@ select * from (
 ) t;
 ```
 
-### 3.2. Computation heavy similarity computation 
+## Option 2: Sequential computation
 
-Alternatively, you can compute cosine similarity as follows. This version 
involves cross join and thus runs sequentially in a single task. However, it 
involves less shuffle when compared to 3.1.
+Alternatively, you can compute cosine similarity as follows. This version 
involves cross join and thus runs sequentially in a single task. However, it 
involves less shuffle compared to option 1.
 
 ```sql
 WITH similarity as (
@@ -393,13 +403,14 @@ from
 | 31231        | 9833 | 0.953 |
 | ... | ... | ... |
 
-## 4. Item-based Recommendation
+# Item-based recommendation
 
 This section introduces item-based recommendation based on recently purchased 
items by each user.
 
-**Caution:** _It would better to ignore recommending some of items that user 
already purchased (only 1 time) while items that are purchased twice or more 
would be okey to be included in the recommendation list (e.g., repeatedly 
purchased daily necessities). So, you would need an item property table showing 
that each item is repeatedly purchased items or not._
+> #### Caution
+> It would better to ignore recommending some of items that user already 
purchased (only 1 time) while items that are purchased twice or more would be 
okey to be included in the recommendation list (e.g., repeatedly purchased 
daily necessities). So, you would need an item property table showing that each 
item is repeatedly purchased items or not.
 
-### 4.1. Computes top-k recently purchaed items for each user
+## Step 1: Computes top-k recently purchased items for each user
 
 First, prepare `recently_purchased_items` table as follows:
 
@@ -422,7 +433,11 @@ from (
 ) t;
 ```
 
-### 4.2. Recommend top-k items based on the cooccurrence for each user's 
recently purchased item
+## Step 2: Recommend top-k items based on users' recently purchased items
+
+In order to generate a list of recommended items, you can use either 
cooccurrence count or similarity as a relevance score.
+
+### Cooccurrence-based
 
 ```sql
 WITH topk as (
@@ -461,7 +476,7 @@ group by
 ;
 ```
 
-### 4.3. Recommend top-k items based on the (cooccurrence) similarity for each 
user's recently purchased item
+### Similarity-based
 
 ```sql
 WITH topk as (
@@ -500,7 +515,11 @@ group by
 ;
 ```
 
-## 5. Pseudo Jaccard Similarity computation using MinHash
+# Efficient similarity computation
+
+Since naive similarity computation takes `O(n^2)` computational complexity, 
utilizing a certain approximation scheme is practically important to improve 
efficiency and feasibility. In particular, Hivemall enables you to use one of 
two sophisticated approximation schemes, 
[MinHash](https://en.wikipedia.org/wiki/MinHash#Jaccard_similarity_and_minimum_hash_values)
 and 
[DIMSUM](https://blog.twitter.com/engineering/en_us/a/2014/all-pairs-similarity-via-dimsum.html).
+
+## MinHash: Compute "pseudo" Jaccard similarity
 
 Refer [this 
article](https://en.wikipedia.org/wiki/MinHash#Jaccard_similarity_and_minimum_hash_values
 ) to get details about MinHash and Jarccard similarity. [This blog 
article](https://blog.treasuredata.com/blog/2016/02/16/minhash-in-hivemall/) 
also explains about Hivemall's minhash.
@@ -547,11 +566,13 @@ from
   top100
 ;
 ```
-_Caution: Note that there might be no similar item for certain items._
 
-### 5.1. Cosine similarity computation following minhash-based similarity 
items filtering
+> #### Note
+> There might be no similar item for certain items.
+
+### Compute approximated cosine similarity by using the MinHash-based Jaccard 
similarity
 
-You can compute `top-k` similar items based on cosine similarity, following 
rough `top-N` similar items listing using minhash, where `k << N` (e.g., k=10 
and N=100).
+Once the MinHash-based approach found rough `top-N` similar items, you can 
efficiently find `top-k` similar items in terms of cosine similarity, where `k 
<< N` (e.g., k=10 and N=100).
 
 ```sql
 WITH similarity as (
@@ -581,4 +602,116 @@ select
   itemid, other, similarity
 from 
   topk;
+```
+
+## DIMSUM: Approximated all-pairs similarity computation
+
+> #### Note
+> This feature is supported from Hivemall v0.5-rc.1 or later.
+
+DIMSUM is a technique to efficiently and approximately compute similarities 
for all-pairs of items. You can refer to [an article in Twitter's Engineering 
blog](https://blog.twitter.com/engineering/en_us/a/2014/all-pairs-similarity-via-dimsum.html)
 to learn how DIMSUM reduces running time.
+
+Here, let us begin with the `user_purchased` table. `item_similarity` table 
can be obtained as follows:
+
+```sql
+create table item_similarity as
+with item_magnitude as ( -- compute magnitude of each item vector
+  select
+    to_map(j, mag) as mags
+  from (
+    select 
+      itemid as j,
+      l2_norm(ln(purchase_count+1)) as mag -- use scaled value
+    from 
+      user_purchased
+    group by
+      itemid
+  ) t0
+),
+item_features as (
+  select
+    userid as i,
+    collect_list(
+      feature(itemid, ln(purchase_count+1)) -- use scaled value
+    ) as feature_vector
+  from
+    user_purchased
+  group by
+    userid
+),
+partial_result as ( -- launch DIMSUM in a MapReduce fashion
+  select
+    dimsum_mapper(f.feature_vector, m.mags, '-threshold 0.5')
+      as (itemid, other, s)
+  from
+    item_features f
+  left outer join item_magnitude m
+),
+similarity as ( -- reduce (i.e., sum up) mappers' partial results
+  select
+    itemid, 
+    other,
+    sum(s) as similarity
+  from 
+    partial_result
+  group by
+    itemid, other
+),
+topk as (
+  select
+    each_top_k( -- get top-10 items based on similarity score
+      10, itemid, similarity,
+      itemid, other -- output items
+    ) as (rank, similarity, itemid, other)
+  from (
+    select * from similarity
+    CLUSTER BY itemid
+  ) t
+)
+-- insert overwrite table item_similarity
+select 
+  itemid, other, similarity
+from 
+  topk
+;
+```
+
+Ultimately, using `item_similarity` for [item-based 
recommendation](#item-based-recommendation) is straightforward in a similar way 
to what we explained above.
+
+In the above query, an important part is obviously 
`dimsum_mapper(f.feature_vector, m.mags, '-threshold 0.5')`. An option 
`-threshold` is a real value in `[0, 1)` range, and intuitively it illustrates 
*"similarities above this threshold are approximated by the DIMSUM algorithm"*.
+
+### Create `item_similarity` from Upper Triangular Matrix
+
+Thanks to the symmetric property of similarity matrix, DIMSUM enables you to 
utilize space-efficient Upper-Triangular-Matrix-style output by just adding an 
option `-disable_symmetric_output`:
+
+```sql
+create table item_similarity as
+with item_magnitude as (
+  ...
+),
+partial_result as (
+  select
+    dimsum_mapper(f.feature_vector, m.mags, '-threshold 0.5 
-disable_symmetric_output')
+      as (itemid, other, s)
+  from
+    item_features f
+  left outer join item_magnitude m
+),
+similarity_upper_triangular as ( -- if similarity of (i1, i2) pair is in this 
table, (i2, i1)'s similarity is omitted
+  select
+    itemid, 
+    other,
+    sum(s) as similarity
+  from 
+    partial_result
+  group by
+    itemid, other
+),
+similarity as ( -- copy (i1, i2)'s similarity as (i2, i1)'s one
+  select itemid, other, similarity from similarity_upper_triangular
+  union all
+  select other as itemid, itemid as other, similarity from 
similarity_upper_triangular
+),
+topk as (
+  ...
 ```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-all-as-permanent.hive
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all-as-permanent.hive 
b/resources/ddl/define-all-as-permanent.hive
index 425d8ff..52f3fab 100644
--- a/resources/ddl/define-all-as-permanent.hive
+++ b/resources/ddl/define-all-as-permanent.hive
@@ -51,7 +51,7 @@ CREATE FUNCTION kpa_predict as 
'hivemall.classifier.KPAPredictUDAF' USING JAR '$
 
 --------------------------------
 --  Multiclass classification --
--------------------------------- 
+--------------------------------
 
 DROP FUNCTION IF EXISTS train_multiclass_perceptron;
 CREATE FUNCTION train_multiclass_perceptron as 
'hivemall.classifier.multiclass.MulticlassPerceptronUDTF' USING JAR 
'${hivemall_jar}';
@@ -99,6 +99,9 @@ CREATE FUNCTION euclid_similarity as 
'hivemall.knn.similarity.EuclidSimilarity'
 DROP FUNCTION IF EXISTS distance2similarity;
 CREATE FUNCTION distance2similarity as 
'hivemall.knn.similarity.Distance2SimilarityUDF' USING JAR '${hivemall_jar}';
 
+DROP FUNCTION IF EXISTS dimsum_mapper;
+CREATE FUNCTION dimsum_mapper as 'hivemall.knn.similarity.DIMSUMMapperUDTF' 
USING JAR '${hivemall_jar}';
+
 ------------------------
 -- distance functions --
 ------------------------
@@ -457,6 +460,9 @@ CREATE FUNCTION to_ordered_map as 
'hivemall.tools.map.UDAFToOrderedMap' USING JA
 DROP FUNCTION IF EXISTS sigmoid;
 CREATE FUNCTION sigmoid as 'hivemall.tools.math.SigmoidGenericUDF' USING JAR 
'${hivemall_jar}';
 
+DROP FUNCTION IF EXISTS l2_norm;
+CREATE FUNCTION l2_norm as 'hivemall.tools.math.L2NormUDAF' USING JAR 
'${hivemall_jar}';
+
 ----------------------
 -- Matrix functions --
 ----------------------

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-all.hive
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all.hive b/resources/ddl/define-all.hive
index d283812..f752c19 100644
--- a/resources/ddl/define-all.hive
+++ b/resources/ddl/define-all.hive
@@ -47,7 +47,7 @@ create temporary function kpa_predict as 
'hivemall.classifier.KPAPredictUDAF';
 
 --------------------------------
 --  Multiclass classification --
--------------------------------- 
+--------------------------------
 
 drop temporary function if exists train_multiclass_perceptron;
 create temporary function train_multiclass_perceptron as 
'hivemall.classifier.multiclass.MulticlassPerceptronUDTF';
@@ -95,6 +95,9 @@ create temporary function euclid_similarity as 
'hivemall.knn.similarity.EuclidSi
 drop temporary function if exists distance2similarity;
 create temporary function distance2similarity as 
'hivemall.knn.similarity.Distance2SimilarityUDF';
 
+drop temporary function if exists dimsum_mapper;
+create temporary function dimsum_mapper as 
'hivemall.knn.similarity.DIMSUMMapperUDTF';
+
 ------------------------
 -- distance functions --
 ------------------------
@@ -453,6 +456,9 @@ create temporary function to_ordered_map as 
'hivemall.tools.map.UDAFToOrderedMap
 drop temporary function if exists sigmoid;
 create temporary function sigmoid as 'hivemall.tools.math.SigmoidGenericUDF';
 
+drop temporary function if exists l2_norm;
+create temporary function l2_norm as 'hivemall.tools.math.L2NormUDAF';
+
 ----------------------
 -- Matrix functions --
 ----------------------

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-all.spark
----------------------------------------------------------------------
diff --git a/resources/ddl/define-all.spark b/resources/ddl/define-all.spark
index 1b90c9b..e4c4f1a 100644
--- a/resources/ddl/define-all.spark
+++ b/resources/ddl/define-all.spark
@@ -97,6 +97,9 @@ sqlContext.sql("CREATE TEMPORARY FUNCTION euclid_similarity 
AS 'hivemall.knn.sim
 sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS distance2similarity")
 sqlContext.sql("CREATE TEMPORARY FUNCTION distance2similarity AS 
'hivemall.knn.similarity.Distance2SimilarityUDF'")
 
+sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS dimsum_mapper")
+sqlContext.sql("CREATE TEMPORARY FUNCTION dimsum_mapper AS 
'hivemall.knn.similarity.DIMSUMMapperUDTF'")
+
 /**
  * Distance functions
  */
@@ -452,6 +455,9 @@ sqlContext.sql("CREATE TEMPORARY FUNCTION to_ordered_map AS 
'hivemall.tools.map.
 sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS sigmoid")
 sqlContext.sql("CREATE TEMPORARY FUNCTION sigmoid AS 
'hivemall.tools.math.SigmoidGenericUDF'")
 
+sqlContext.sql("DROP TEMPORARY FUNCTION IF EXISTS l2_norm")
+sqlContext.sql("CREATE TEMPORARY FUNCTION l2_norm AS 
'hivemall.tools.math.L2NormUDAF'")
+
 /**
  * Matrix functions
  */

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1fbf90a1/resources/ddl/define-udfs.td.hql
----------------------------------------------------------------------
diff --git a/resources/ddl/define-udfs.td.hql b/resources/ddl/define-udfs.td.hql
index e549649..547bf84 100644
--- a/resources/ddl/define-udfs.td.hql
+++ b/resources/ddl/define-udfs.td.hql
@@ -164,6 +164,8 @@ create temporary function train_plsa as 
'hivemall.topicmodel.PLSAUDTF';
 create temporary function plsa_predict as 
'hivemall.topicmodel.PLSAPredictUDAF';
 create temporary function tile as 'hivemall.geospatial.TileUDF';
 create temporary function map_url as 'hivemall.geospatial.MapURLUDF';
+create temporary function l2_norm as 'hivemall.tools.math.L2NormUDAF';
+create temporary function dimsum_mapper as 
'hivemall.knn.similarity.DIMSUMMapperUDTF';
 
 -- NLP features
 create temporary function tokenize_ja as 'hivemall.nlp.tokenizer.KuromojiUDF';

Reply via email to