http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/benchmark/PlusBenchmark.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/benchmark/PlusBenchmark.java 
b/integration/src/main/java/org/apache/mahout/benchmark/PlusBenchmark.java
deleted file mode 100644
index bd76e94..0000000
--- a/integration/src/main/java/org/apache/mahout/benchmark/PlusBenchmark.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.benchmark;
-
-import static org.apache.mahout.benchmark.VectorBenchmarks.DENSE_FN_RAND;
-import static org.apache.mahout.benchmark.VectorBenchmarks.DENSE_FN_SEQ;
-import static org.apache.mahout.benchmark.VectorBenchmarks.DENSE_VECTOR;
-import static org.apache.mahout.benchmark.VectorBenchmarks.RAND_FN_DENSE;
-import static org.apache.mahout.benchmark.VectorBenchmarks.RAND_FN_SEQ;
-import static org.apache.mahout.benchmark.VectorBenchmarks.RAND_SPARSE_VECTOR;
-import static org.apache.mahout.benchmark.VectorBenchmarks.SEQ_FN_DENSE;
-import static org.apache.mahout.benchmark.VectorBenchmarks.SEQ_FN_RAND;
-import static org.apache.mahout.benchmark.VectorBenchmarks.SEQ_SPARSE_VECTOR;
-
-import org.apache.mahout.benchmark.BenchmarkRunner.BenchmarkFn;
-import org.apache.mahout.math.Vector;
-
-public class PlusBenchmark {
-
-  private static final String PLUS = "Plus";
-  private final VectorBenchmarks mark;
-
-  public PlusBenchmark(VectorBenchmarks mark) {
-    this.mark = mark;
-  }
-
-  public void benchmark() {
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[0][mark.vIndex(i)].plus(mark.vectors[0][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, DENSE_VECTOR);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[1][mark.vIndex(i)].plus(mark.vectors[1][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, RAND_SPARSE_VECTOR);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[2][mark.vIndex(i)].plus(mark.vectors[2][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, SEQ_SPARSE_VECTOR);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[0][mark.vIndex(i)].plus(mark.vectors[1][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, DENSE_FN_RAND);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[0][mark.vIndex(i)].plus(mark.vectors[2][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, DENSE_FN_SEQ);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[1][mark.vIndex(i)].plus(mark.vectors[0][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, RAND_FN_DENSE);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[1][mark.vIndex(i)].plus(mark.vectors[2][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, RAND_FN_SEQ);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[2][mark.vIndex(i)].plus(mark.vectors[0][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, SEQ_FN_DENSE);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[2][mark.vIndex(i)].plus(mark.vectors[1][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), PLUS, SEQ_FN_RAND);
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/benchmark/SerializationBenchmark.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/benchmark/SerializationBenchmark.java
 
b/integration/src/main/java/org/apache/mahout/benchmark/SerializationBenchmark.java
deleted file mode 100644
index cd403c2..0000000
--- 
a/integration/src/main/java/org/apache/mahout/benchmark/SerializationBenchmark.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.benchmark;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.mahout.common.TimingStatistics;
-import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
-import org.apache.mahout.math.VectorWritable;
-
-import java.io.IOException;
-
-import static org.apache.mahout.benchmark.VectorBenchmarks.DENSE_VECTOR;
-import static org.apache.mahout.benchmark.VectorBenchmarks.RAND_SPARSE_VECTOR;
-import static org.apache.mahout.benchmark.VectorBenchmarks.SEQ_SPARSE_VECTOR;
-
-public class SerializationBenchmark {
-  public static final String SERIALIZE = "Serialize";
-  public static final String DESERIALIZE = "Deserialize";
-  private final VectorBenchmarks mark;
-
-  public SerializationBenchmark(VectorBenchmarks mark) {
-    this.mark = mark;
-  }
-
-  public void benchmark() throws IOException {
-    serializeBenchmark();
-    deserializeBenchmark();
-  }
-
-  public void serializeBenchmark() throws IOException {
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
-
-    Writable one = new IntWritable(0);
-    VectorWritable vec = new VectorWritable();
-    TimingStatistics stats = new TimingStatistics();
-
-    try (SequenceFile.Writer writer =
-             new SequenceFile.Writer(fs, conf, new Path("/tmp/dense-vector"),
-                 IntWritable.class, VectorWritable.class)){
-      for (int i = 0; i < mark.loop; i++) {
-        TimingStatistics.Call call = stats.newCall(mark.leadTimeUsec);
-        vec.set(mark.vectors[0][mark.vIndex(i)]);
-        writer.append(one, vec);
-        if (call.end(mark.maxTimeUsec)) {
-          break;
-        }
-      }
-    }
-    mark.printStats(stats, SERIALIZE, DENSE_VECTOR);
-
-    stats = new TimingStatistics();
-    try (SequenceFile.Writer writer =
-             new SequenceFile.Writer(fs, conf,
-                 new Path("/tmp/randsparse-vector"), IntWritable.class, 
VectorWritable.class)){
-      for (int i = 0; i < mark.loop; i++) {
-        TimingStatistics.Call call = stats.newCall(mark.leadTimeUsec);
-        vec.set(mark.vectors[1][mark.vIndex(i)]);
-        writer.append(one, vec);
-        if (call.end(mark.maxTimeUsec)) {
-          break;
-        }
-      }
-    }
-    mark.printStats(stats, SERIALIZE, RAND_SPARSE_VECTOR);
-
-    stats = new TimingStatistics();
-    try (SequenceFile.Writer writer =
-             new SequenceFile.Writer(fs, conf,
-                 new Path("/tmp/seqsparse-vector"), IntWritable.class, 
VectorWritable.class)) {
-      for (int i = 0; i < mark.loop; i++) {
-        TimingStatistics.Call call = stats.newCall(mark.leadTimeUsec);
-        vec.set(mark.vectors[2][mark.vIndex(i)]);
-        writer.append(one, vec);
-        if (call.end(mark.maxTimeUsec)) {
-          break;
-        }
-      }
-    }
-    mark.printStats(stats, SERIALIZE, SEQ_SPARSE_VECTOR);
-
-  }
-
-  public void deserializeBenchmark() throws IOException {
-    doDeserializeBenchmark(DENSE_VECTOR, "/tmp/dense-vector");
-    doDeserializeBenchmark(RAND_SPARSE_VECTOR, "/tmp/randsparse-vector");
-    doDeserializeBenchmark(SEQ_SPARSE_VECTOR, "/tmp/seqsparse-vector");
-  }
-
-  private void doDeserializeBenchmark(String name, String pathString) throws 
IOException {
-    TimingStatistics stats = new TimingStatistics();
-    TimingStatistics.Call call = stats.newCall(mark.leadTimeUsec);
-    SequenceFileValueIterator<Writable> iterator = new 
SequenceFileValueIterator<>(new Path(pathString), true,
-        new Configuration());
-    while (iterator.hasNext()) {
-      iterator.next();
-      call.end();
-      call = stats.newCall(mark.leadTimeUsec);
-    }
-    iterator.close();
-    mark.printStats(stats, DESERIALIZE, name);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/benchmark/TimesBenchmark.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/benchmark/TimesBenchmark.java 
b/integration/src/main/java/org/apache/mahout/benchmark/TimesBenchmark.java
deleted file mode 100644
index bf81228..0000000
--- a/integration/src/main/java/org/apache/mahout/benchmark/TimesBenchmark.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.benchmark;
-
-import static org.apache.mahout.benchmark.VectorBenchmarks.DENSE_FN_RAND;
-import static org.apache.mahout.benchmark.VectorBenchmarks.DENSE_FN_SEQ;
-import static org.apache.mahout.benchmark.VectorBenchmarks.DENSE_VECTOR;
-import static org.apache.mahout.benchmark.VectorBenchmarks.RAND_FN_DENSE;
-import static org.apache.mahout.benchmark.VectorBenchmarks.RAND_FN_SEQ;
-import static org.apache.mahout.benchmark.VectorBenchmarks.RAND_SPARSE_VECTOR;
-import static org.apache.mahout.benchmark.VectorBenchmarks.SEQ_FN_DENSE;
-import static org.apache.mahout.benchmark.VectorBenchmarks.SEQ_FN_RAND;
-import static org.apache.mahout.benchmark.VectorBenchmarks.SEQ_SPARSE_VECTOR;
-
-import org.apache.mahout.benchmark.BenchmarkRunner.BenchmarkFn;
-import org.apache.mahout.math.Vector;
-
-public class TimesBenchmark {
-
-  private static final String TIMES = "Times";
-  private final VectorBenchmarks mark;
-
-  public TimesBenchmark(VectorBenchmarks mark) {
-    this.mark = mark;
-  }
-
-  public void benchmark() {
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[0][mark.vIndex(i)].times(mark.vectors[0][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, DENSE_VECTOR);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[1][mark.vIndex(i)].times(mark.vectors[1][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, RAND_SPARSE_VECTOR);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[2][mark.vIndex(i)].times(mark.vectors[2][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, SEQ_SPARSE_VECTOR);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[0][mark.vIndex(i)].times(mark.vectors[1][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, DENSE_FN_RAND);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[0][mark.vIndex(i)].times(mark.vectors[2][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, DENSE_FN_SEQ);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[1][mark.vIndex(i)].times(mark.vectors[0][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, RAND_FN_DENSE);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[1][mark.vIndex(i)].times(mark.vectors[2][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, RAND_FN_SEQ);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[2][mark.vIndex(i)].times(mark.vectors[0][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, SEQ_FN_DENSE);
-
-    mark.printStats(mark.getRunner().benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        Vector v = 
mark.vectors[2][mark.vIndex(i)].times(mark.vectors[1][mark.vIndex(randIndex())]);
-        return depends(v);
-      }
-    }), TIMES, SEQ_FN_RAND);
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/benchmark/VectorBenchmarks.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/benchmark/VectorBenchmarks.java 
b/integration/src/main/java/org/apache/mahout/benchmark/VectorBenchmarks.java
deleted file mode 100644
index a076322..0000000
--- 
a/integration/src/main/java/org/apache/mahout/benchmark/VectorBenchmarks.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.benchmark;
-
-import org.apache.commons.cli2.CommandLine;
-import org.apache.commons.cli2.Group;
-import org.apache.commons.cli2.Option;
-import org.apache.commons.cli2.OptionException;
-import org.apache.commons.cli2.builder.ArgumentBuilder;
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
-import org.apache.commons.cli2.builder.GroupBuilder;
-import org.apache.commons.cli2.commandline.Parser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.mahout.benchmark.BenchmarkRunner.BenchmarkFn;
-import org.apache.mahout.common.CommandLineUtil;
-import org.apache.mahout.common.RandomUtils;
-import org.apache.mahout.common.TimingStatistics;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.common.distance.ChebyshevDistanceMeasure;
-import org.apache.mahout.common.distance.CosineDistanceMeasure;
-import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
-import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
-import org.apache.mahout.common.distance.MinkowskiDistanceMeasure;
-import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
-import org.apache.mahout.common.distance.TanimotoDistanceMeasure;
-import org.apache.mahout.math.DenseVector;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.SequentialAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-public class VectorBenchmarks {
-  private static final int MAX_TIME_MS = 5000;
-  private static final int LEAD_TIME_MS = 15000;
-  public static final String CLUSTERS = "Clusters";
-  public static final String CREATE_INCREMENTALLY = "Create (incrementally)";
-  public static final String CREATE_COPY = "Create (copy)";
-
-  public static final String DENSE_FN_SEQ = "Dense.fn(Seq)";
-  public static final String RAND_FN_DENSE = "Rand.fn(Dense)";
-  public static final String SEQ_FN_RAND = "Seq.fn(Rand)";
-  public static final String RAND_FN_SEQ = "Rand.fn(Seq)";
-  public static final String SEQ_FN_DENSE = "Seq.fn(Dense)";
-  public static final String DENSE_FN_RAND = "Dense.fn(Rand)";
-  public static final String SEQ_SPARSE_VECTOR = "SeqSparseVector";
-  public static final String RAND_SPARSE_VECTOR = "RandSparseVector";
-  public static final String DENSE_VECTOR = "DenseVector";
-
-  private static final Logger log = 
LoggerFactory.getLogger(VectorBenchmarks.class);
-  private static final Pattern TAB_NEWLINE_PATTERN = Pattern.compile("[\n\t]");
-  private static final String[] EMPTY = new String[0];
-  private static final DecimalFormat DF = new DecimalFormat("#.##");
-
-  /* package private */
-  final Vector[][] vectors;
-  final Vector[] clusters;
-  final int cardinality;
-  final int numNonZeros;
-  final int numVectors;
-  final int numClusters;
-  final int loop = Integer.MAX_VALUE;
-  final int opsPerUnit;
-  final long maxTimeUsec;
-  final long leadTimeUsec;
-
-  private final List<Vector> randomVectors = new ArrayList<>();
-  private final List<int[]> randomVectorIndices = new ArrayList<>();
-  private final List<double[]> randomVectorValues = new ArrayList<>();
-  private final Map<String, Integer> implType = new HashMap<>();
-  private final Map<String, List<String[]>> statsMap = new HashMap<>();
-  private final BenchmarkRunner runner;
-  private final Random r = RandomUtils.getRandom();
-
-  public VectorBenchmarks(int cardinality, int numNonZeros, int numVectors, 
int numClusters,
-      int opsPerUnit) {
-    runner = new BenchmarkRunner(LEAD_TIME_MS, MAX_TIME_MS);
-    maxTimeUsec = TimeUnit.MILLISECONDS.toNanos(MAX_TIME_MS);
-    leadTimeUsec = TimeUnit.MILLISECONDS.toNanos(LEAD_TIME_MS);
-
-    this.cardinality = cardinality;
-    this.numNonZeros = numNonZeros;
-    this.numVectors = numVectors;
-    this.numClusters = numClusters;
-    this.opsPerUnit = opsPerUnit;
-
-    setUpVectors(cardinality, numNonZeros, numVectors);
-
-    vectors = new Vector[3][numVectors];
-    clusters = new Vector[numClusters];
-  }
-
-  private void setUpVectors(int cardinality, int numNonZeros, int numVectors) {
-    for (int i = 0; i < numVectors; i++) {
-      Vector v = new SequentialAccessSparseVector(cardinality, numNonZeros); 
// sparsity!
-      BitSet featureSpace = new BitSet(cardinality);
-      int[] indexes = new int[numNonZeros];
-      double[] values = new double[numNonZeros];
-      int j = 0;
-      while (j < numNonZeros) {
-        double value = r.nextGaussian();
-        int index = r.nextInt(cardinality);
-        if (!featureSpace.get(index) && value != 0) {
-          featureSpace.set(index);
-          indexes[j] = index;
-          values[j++] = value;
-          v.set(index, value);
-        }
-      }
-      randomVectorIndices.add(indexes);
-      randomVectorValues.add(values);
-      randomVectors.add(v);
-    }
-  }
-
-  void printStats(TimingStatistics stats, String benchmarkName, String 
implName, String content) {
-    printStats(stats, benchmarkName, implName, content, 1);
-  }
-
-  void printStats(TimingStatistics stats, String benchmarkName, String 
implName) {
-    printStats(stats, benchmarkName, implName, "", 1);
-  }
-
-  private void printStats(TimingStatistics stats, String benchmarkName, String 
implName,
-      String content, int multiplier) {
-    float speed = multiplier * stats.getNCalls() * (numNonZeros * 1000.0f * 12 
/ stats.getSumTime());
-    float opsPerSec = stats.getNCalls() * 1000000000.0f / stats.getSumTime();
-    log.info("{} {} \n{} {} \nOps    = {} Units/sec\nIOps   = {} MBytes/sec", 
benchmarkName,
-        implName, content, stats.toString(), DF.format(opsPerSec), 
DF.format(speed));
-
-    if (!implType.containsKey(implName)) {
-      implType.put(implName, implType.size());
-    }
-    int implId = implType.get(implName);
-    if (!statsMap.containsKey(benchmarkName)) {
-      statsMap.put(benchmarkName, new ArrayList<String[]>());
-    }
-    List<String[]> implStats = statsMap.get(benchmarkName);
-    while (implStats.size() < implId + 1) {
-      implStats.add(EMPTY);
-    }
-    implStats.set(
-        implId,
-        TAB_NEWLINE_PATTERN.split(stats + "\tSpeed  = " + DF.format(opsPerSec) 
+ " /sec\tRate   = "
-            + DF.format(speed) + " MB/s"));
-  }
-
-  public void createData() {
-    for (int i = 0; i < Math.max(numVectors, numClusters); ++i) {
-      vectors[0][vIndex(i)] = new DenseVector(randomVectors.get(vIndex(i)));
-      vectors[1][vIndex(i)] = new 
RandomAccessSparseVector(randomVectors.get(vIndex(i)));
-      vectors[2][vIndex(i)] = new 
SequentialAccessSparseVector(randomVectors.get(vIndex(i)));
-      if (numClusters > 0) {
-        clusters[cIndex(i)] = new 
RandomAccessSparseVector(randomVectors.get(vIndex(i)));
-      }
-    }
-  }
-
-  public void createBenchmark() {
-    printStats(runner.benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        vectors[0][vIndex(i)] = new DenseVector(randomVectors.get(vIndex(i)));
-        return depends(vectors[0][vIndex(i)]);
-      }
-    }), CREATE_COPY, DENSE_VECTOR);
-
-    printStats(runner.benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        vectors[1][vIndex(i)] = new 
RandomAccessSparseVector(randomVectors.get(vIndex(i)));
-        return depends(vectors[1][vIndex(i)]);
-      }
-    }), CREATE_COPY, RAND_SPARSE_VECTOR);
-
-    printStats(runner.benchmark(new BenchmarkFn() {
-      @Override
-      public Boolean apply(Integer i) {
-        vectors[2][vIndex(i)] = new 
SequentialAccessSparseVector(randomVectors.get(vIndex(i)));
-        return depends(vectors[2][vIndex(i)]);
-      }
-    }), CREATE_COPY, SEQ_SPARSE_VECTOR);
-
-    if (numClusters > 0) {
-      printStats(runner.benchmark(new BenchmarkFn() {
-        @Override
-        public Boolean apply(Integer i) {
-          clusters[cIndex(i)] = new 
RandomAccessSparseVector(randomVectors.get(vIndex(i)));
-          return depends(clusters[cIndex(i)]);
-        }
-      }), CREATE_COPY, CLUSTERS);
-    }
-  }
-
-  private boolean buildVectorIncrementally(TimingStatistics stats, int 
randomIndex, Vector v, boolean useSetQuick) {
-    int[] indexes = randomVectorIndices.get(randomIndex);
-    double[] values = randomVectorValues.get(randomIndex);
-    List<Integer> randomOrder = new ArrayList<>();
-    for (int i = 0; i < indexes.length; i++) {
-      randomOrder.add(i);
-    }
-    Collections.shuffle(randomOrder);
-    int[] permutation = new int[randomOrder.size()];
-    for (int i = 0; i < randomOrder.size(); i++) {
-      permutation[i] = randomOrder.get(i);
-    }
-
-    TimingStatistics.Call call = stats.newCall(leadTimeUsec);
-    if (useSetQuick) {
-      for (int i : permutation) {
-        v.setQuick(indexes[i], values[i]);
-      }
-    } else {
-      for (int i : permutation) {
-        v.set(indexes[i], values[i]);
-      }
-    }
-    return call.end(maxTimeUsec);
-  }
-
-  public void incrementalCreateBenchmark() {
-    TimingStatistics stats = new TimingStatistics();
-    for (int i = 0; i < loop; i++) {
-      vectors[0][vIndex(i)] = new DenseVector(cardinality);
-      if (buildVectorIncrementally(stats, vIndex(i), vectors[0][vIndex(i)], 
false)) {
-        break;
-      }
-    }
-    printStats(stats, CREATE_INCREMENTALLY, DENSE_VECTOR);
-
-    stats = new TimingStatistics();
-    for (int i = 0; i < loop; i++) {
-      vectors[1][vIndex(i)] = new RandomAccessSparseVector(cardinality);
-      if (buildVectorIncrementally(stats, vIndex(i), vectors[1][vIndex(i)], 
false)) {
-        break;
-      }
-    }
-    printStats(stats, CREATE_INCREMENTALLY, RAND_SPARSE_VECTOR);
-
-    stats = new TimingStatistics();
-    for (int i = 0; i < loop; i++) {
-      vectors[2][vIndex(i)] = new SequentialAccessSparseVector(cardinality);
-      if (buildVectorIncrementally(stats, vIndex(i), vectors[2][vIndex(i)], 
false)) {
-        break;
-      }
-    }
-    printStats(stats, CREATE_INCREMENTALLY, SEQ_SPARSE_VECTOR);
-
-    if (numClusters > 0) {
-      stats = new TimingStatistics();
-      for (int i = 0; i < loop; i++) {
-        clusters[cIndex(i)] = new RandomAccessSparseVector(cardinality);
-        if (buildVectorIncrementally(stats, vIndex(i), clusters[cIndex(i)], 
false)) {
-          break;
-        }
-      }
-      printStats(stats, CREATE_INCREMENTALLY, CLUSTERS);
-    }
-  }
-
-  public int vIndex(int i) {
-    return i % numVectors;
-  }
-
-  public int cIndex(int i) {
-    return i % numClusters;
-  }
-
-  public static void main(String[] args) throws IOException {
-    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
-    ArgumentBuilder abuilder = new ArgumentBuilder();
-    GroupBuilder gbuilder = new GroupBuilder();
-
-    Option vectorSizeOpt = obuilder
-        .withLongName("vectorSize")
-        .withRequired(false)
-        .withArgument(abuilder.withName("vs").withDefault(1000000).create())
-        .withDescription("Cardinality of the vector. Default: 
1000000").withShortName("vs").create();
-    Option numNonZeroOpt = obuilder
-        .withLongName("numNonZero")
-        .withRequired(false)
-        .withArgument(abuilder.withName("nz").withDefault(1000).create())
-        .withDescription("Size of the vector. Default: 
1000").withShortName("nz").create();
-    Option numVectorsOpt = obuilder
-        .withLongName("numVectors")
-        .withRequired(false)
-        .withArgument(abuilder.withName("nv").withDefault(25).create())
-        .withDescription("Number of Vectors to create. Default: 
25").withShortName("nv").create();
-    Option numClustersOpt = obuilder
-        .withLongName("numClusters")
-        .withRequired(false)
-        .withArgument(abuilder.withName("nc").withDefault(0).create())
-        .withDescription("Number of clusters to create. Set to non zero to run 
cluster benchmark. Default: 0")
-        .withShortName("nc").create();
-    Option numOpsOpt = obuilder
-        .withLongName("numOps")
-        .withRequired(false)
-        .withArgument(abuilder.withName("numOps").withDefault(10).create())
-        .withDescription(
-            "Number of operations to do per timer. "
-                + "E.g In distance measure, the distance is calculated numOps 
times"
-                + " and the total time is measured. Default: 
10").withShortName("no").create();
-
-    Option helpOpt = DefaultOptionCreator.helpOption();
-
-    Group group = 
gbuilder.withName("Options").withOption(vectorSizeOpt).withOption(numNonZeroOpt)
-        
.withOption(numVectorsOpt).withOption(numOpsOpt).withOption(numClustersOpt).withOption(helpOpt).create();
-
-    try {
-      Parser parser = new Parser();
-      parser.setGroup(group);
-      CommandLine cmdLine = parser.parse(args);
-
-      if (cmdLine.hasOption(helpOpt)) {
-        CommandLineUtil.printHelpWithGenericOptions(group);
-        return;
-      }
-
-      int cardinality = 1000000;
-      if (cmdLine.hasOption(vectorSizeOpt)) {
-        cardinality = Integer.parseInt((String) 
cmdLine.getValue(vectorSizeOpt));
-
-      }
-
-      int numClusters = 0;
-      if (cmdLine.hasOption(numClustersOpt)) {
-        numClusters = Integer.parseInt((String) 
cmdLine.getValue(numClustersOpt));
-      }
-
-      int numNonZero = 1000;
-      if (cmdLine.hasOption(numNonZeroOpt)) {
-        numNonZero = Integer.parseInt((String) 
cmdLine.getValue(numNonZeroOpt));
-      }
-
-      int numVectors = 25;
-      if (cmdLine.hasOption(numVectorsOpt)) {
-        numVectors = Integer.parseInt((String) 
cmdLine.getValue(numVectorsOpt));
-
-      }
-
-      int numOps = 10;
-      if (cmdLine.hasOption(numOpsOpt)) {
-        numOps = Integer.parseInt((String) cmdLine.getValue(numOpsOpt));
-
-      }
-      VectorBenchmarks mark = new VectorBenchmarks(cardinality, numNonZero, 
numVectors, numClusters, numOps);
-      runBenchmark(mark);
-
-      // log.info("\n{}", mark);
-      log.info("\n{}", mark.asCsvString());
-    } catch (OptionException e) {
-      CommandLineUtil.printHelp(group);
-    }
-  }
-
-  private static void runBenchmark(VectorBenchmarks mark) throws IOException {
-    // Required to set up data.
-    mark.createData();
-
-    mark.createBenchmark();
-    if (mark.cardinality < 200000) {
-      // Too slow.
-      mark.incrementalCreateBenchmark();
-    }
-
-    new CloneBenchmark(mark).benchmark();
-    new DotBenchmark(mark).benchmark();
-    new PlusBenchmark(mark).benchmark();
-    new MinusBenchmark(mark).benchmark();
-    new TimesBenchmark(mark).benchmark();
-    new SerializationBenchmark(mark).benchmark();
-
-    DistanceBenchmark distanceBenchmark = new DistanceBenchmark(mark);
-    distanceBenchmark.benchmark(new CosineDistanceMeasure());
-    distanceBenchmark.benchmark(new SquaredEuclideanDistanceMeasure());
-    distanceBenchmark.benchmark(new EuclideanDistanceMeasure());
-    distanceBenchmark.benchmark(new ManhattanDistanceMeasure());
-    distanceBenchmark.benchmark(new TanimotoDistanceMeasure());
-    distanceBenchmark.benchmark(new ChebyshevDistanceMeasure());
-    distanceBenchmark.benchmark(new MinkowskiDistanceMeasure());
-
-    if (mark.numClusters > 0) {
-      ClosestCentroidBenchmark centroidBenchmark = new 
ClosestCentroidBenchmark(mark);
-      centroidBenchmark.benchmark(new CosineDistanceMeasure());
-      centroidBenchmark.benchmark(new SquaredEuclideanDistanceMeasure());
-      centroidBenchmark.benchmark(new EuclideanDistanceMeasure());
-      centroidBenchmark.benchmark(new ManhattanDistanceMeasure());
-      centroidBenchmark.benchmark(new TanimotoDistanceMeasure());
-      centroidBenchmark.benchmark(new ChebyshevDistanceMeasure());
-      centroidBenchmark.benchmark(new MinkowskiDistanceMeasure());
-    }
-  }
-
-  private String asCsvString() {
-    List<String> keys = new ArrayList<>(statsMap.keySet());
-    Collections.sort(keys);
-    Map<Integer,String> implMap = new HashMap<>();
-    for (Entry<String,Integer> e : implType.entrySet()) {
-      implMap.put(e.getValue(), e.getKey());
-    }
-
-    StringBuilder sb = new StringBuilder(1000);
-    for (String benchmarkName : keys) {
-      int i = 0;
-      for (String[] stats : statsMap.get(benchmarkName)) {
-        if (stats.length < 8) {
-          continue;
-        }
-        sb.append(benchmarkName).append(',');
-        sb.append(implMap.get(i++)).append(',');
-        sb.append(stats[7].trim().split("=|/")[1].trim());
-        sb.append('\n');
-      }
-    }
-    sb.append('\n');
-    return sb.toString();
-  }
-
-  @Override
-  public String toString() {
-    int pad = 24;
-    StringBuilder sb = new StringBuilder(1000);
-    sb.append(StringUtils.rightPad("BenchMarks", pad));
-    for (int i = 0; i < implType.size(); i++) {
-      for (Entry<String,Integer> e : implType.entrySet()) {
-        if (e.getValue() == i) {
-          sb.append(StringUtils.rightPad(e.getKey(), pad).substring(0, pad));
-          break;
-        }
-      }
-    }
-    sb.append('\n');
-    List<String> keys = new ArrayList<>(statsMap.keySet());
-    Collections.sort(keys);
-    for (String benchmarkName : keys) {
-      List<String[]> implTokenizedStats = statsMap.get(benchmarkName);
-      int maxStats = 0;
-      for (String[] stat : implTokenizedStats) {
-        maxStats = Math.max(maxStats, stat.length);
-      }
-
-      for (int i = 0; i < maxStats; i++) {
-        boolean printedName = false;
-        for (String[] stats : implTokenizedStats) {
-          if (i == 0 && !printedName) {
-            sb.append(StringUtils.rightPad(benchmarkName, pad));
-            printedName = true;
-          } else if (!printedName) {
-            printedName = true;
-            sb.append(StringUtils.rightPad("", pad));
-          }
-          if (stats.length > i) {
-            sb.append(StringUtils.rightPad(stats[i], pad));
-          } else {
-            sb.append(StringUtils.rightPad("", pad));
-          }
-
-        }
-        sb.append('\n');
-      }
-      sb.append('\n');
-    }
-    return sb.toString();
-  }
-
-  public BenchmarkRunner getRunner() {
-    return runner;
-  }
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/cassandra/CassandraDataModel.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/cassandra/CassandraDataModel.java
 
b/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/cassandra/CassandraDataModel.java
deleted file mode 100644
index b220993..0000000
--- 
a/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/cassandra/CassandraDataModel.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf.taste.impl.model.cassandra;
-
-import com.google.common.base.Preconditions;
-import me.prettyprint.cassandra.model.HColumnImpl;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.FloatSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.service.OperationType;
-import me.prettyprint.hector.api.Cluster;
-import me.prettyprint.hector.api.ConsistencyLevelPolicy;
-import me.prettyprint.hector.api.HConsistencyLevel;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.ColumnQuery;
-import me.prettyprint.hector.api.query.CountQuery;
-import me.prettyprint.hector.api.query.SliceQuery;
-import org.apache.mahout.cf.taste.common.NoSuchItemException;
-import org.apache.mahout.cf.taste.common.NoSuchUserException;
-import org.apache.mahout.cf.taste.common.Refreshable;
-import org.apache.mahout.cf.taste.common.TasteException;
-import org.apache.mahout.cf.taste.impl.common.Cache;
-import org.apache.mahout.cf.taste.impl.common.FastIDSet;
-import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
-import org.apache.mahout.cf.taste.impl.common.Retriever;
-import org.apache.mahout.cf.taste.impl.model.GenericItemPreferenceArray;
-import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray;
-import org.apache.mahout.cf.taste.model.DataModel;
-import org.apache.mahout.cf.taste.model.PreferenceArray;
-
-import java.io.Closeable;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * <p>A {@link DataModel} based on a Cassandra keyspace. By default it uses 
keyspace "recommender" but this
- * can be configured. Create the keyspace before using this class; this can be 
done on the Cassandra command
- * line with a command linke {@code create keyspace recommender;}.</p>
- *
- * <p>Within the keyspace, this model uses four column families:</p>
- *
- * <p>First, it uses a column family called "users". This is keyed by the user 
ID as an 8-byte long.
- * It contains a column for every preference the user expresses. The column 
name is item ID, again as
- * an 8-byte long, and value is a floating point value represnted as an IEEE 
32-bit floating poitn value.</p>
- *
- * <p>It uses an analogous column family called "items" for the same data, but 
keyed by item ID rather
- * than user ID. In this column family, column names are user IDs instead.</p>
- *
- * <p>It uses a column family called "userIDs" as well, with an identical 
schema. It has one row under key
- * 0. IT contains a column for every user ID in th emodel. It has no 
values.</p>
- *
- * <p>Finally it also uses an analogous column family "itemIDs" containing 
item IDs.</p>
- *
- * <p>Each of these four column families needs to be created ahead of time. 
Again the
- * Cassandra CLI can be used to do so, with commands like {@code create column 
family users;}.</p>
- *
- * <p>Note that this thread uses a long-lived Cassandra client which will run 
until terminated. You
- * must {@link #close()} this implementation when done or the JVM will not 
terminate.</p>
- *
- * <p>This implementation still relies heavily on reading data into memory and 
caching,
- * as it remains too data-intensive to be effective even against Cassandra. It 
will take some time to
- * "warm up" as the first few requests will block loading user and item data 
into caches. This is still going
- * to send a great deal of query traffic to Cassandra. It would be advisable 
to employ caching wrapper
- * classes in your implementation, like {@link 
org.apache.mahout.cf.taste.impl.recommender.CachingRecommender}
- * or {@link 
org.apache.mahout.cf.taste.impl.similarity.CachingItemSimilarity}.</p>
- */
-public final class CassandraDataModel implements DataModel, Closeable {
-
-  /** Default Cassandra host. Default: localhost */
-  private static final String DEFAULT_HOST = "localhost";
-
-  /** Default Cassandra port. Default: 9160 */
-  private static final int DEFAULT_PORT = 9160;
-
-  /** Default Cassandra keyspace. Default: recommender */
-  private static final String DEFAULT_KEYSPACE = "recommender";
-
-  static final String USERS_CF = "users";
-  static final String ITEMS_CF = "items";
-  static final String USER_IDS_CF = "userIDs";
-  static final String ITEM_IDS_CF = "itemIDs";
-  private static final long ID_ROW_KEY = 0L;
-  private static final byte[] EMPTY = new byte[0];
-
-  private final Cluster cluster;
-  private final Keyspace keyspace;
-  private final Cache<Long,PreferenceArray> userCache;
-  private final Cache<Long,PreferenceArray> itemCache;
-  private final Cache<Long,FastIDSet> itemIDsFromUserCache;
-  private final Cache<Long,FastIDSet> userIDsFromItemCache;
-  private final AtomicReference<Integer> userCountCache;
-  private final AtomicReference<Integer> itemCountCache;
-
-  /**
-   * Uses the standard Cassandra host and port (localhost:9160), and keyspace 
name ("recommender").
-   */
-  public CassandraDataModel() {
-    this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_KEYSPACE);
-  }
-
-  /**
-   * @param host Cassandra server host name
-   * @param port Cassandra server port
-   * @param keyspaceName name of Cassandra keyspace to use
-   */
-  public CassandraDataModel(String host, int port, String keyspaceName) {
-    
-    Preconditions.checkNotNull(host);
-    Preconditions.checkArgument(port > 0, "port must be greater then 0!");
-    Preconditions.checkNotNull(keyspaceName);
-
-    cluster = 
HFactory.getOrCreateCluster(CassandraDataModel.class.getSimpleName(), host + 
':' + port);
-    keyspace = HFactory.createKeyspace(keyspaceName, cluster);
-    keyspace.setConsistencyLevelPolicy(new OneConsistencyLevelPolicy());
-
-    userCache = new Cache<>(new UserPrefArrayRetriever(), 1 << 20);
-    itemCache = new Cache<>(new ItemPrefArrayRetriever(), 1 << 20);
-    itemIDsFromUserCache = new Cache<>(new ItemIDsFromUserRetriever(), 1 << 
20);
-    userIDsFromItemCache = new Cache<>(new UserIDsFromItemRetriever(), 1 << 
20);
-    userCountCache = new AtomicReference<>(null);
-    itemCountCache = new AtomicReference<>(null);
-  }
-
-  @Override
-  public LongPrimitiveIterator getUserIDs() {
-    SliceQuery<Long,Long,?> query = buildNoValueSliceQuery(USER_IDS_CF);
-    query.setKey(ID_ROW_KEY);
-    FastIDSet userIDs = new FastIDSet();
-    for (HColumn<Long,?> userIDColumn : query.execute().get().getColumns()) {
-      userIDs.add(userIDColumn.getName());
-    }
-    return userIDs.iterator();
-  }
-
-  @Override
-  public PreferenceArray getPreferencesFromUser(long userID) throws 
TasteException {
-    return userCache.get(userID);
-  }
-
-  @Override
-  public FastIDSet getItemIDsFromUser(long userID) throws TasteException {
-    return itemIDsFromUserCache.get(userID);
-  }
-
-  @Override
-  public LongPrimitiveIterator getItemIDs() {
-    SliceQuery<Long,Long,?> query = buildNoValueSliceQuery(ITEM_IDS_CF);
-    query.setKey(ID_ROW_KEY);
-    FastIDSet itemIDs = new FastIDSet();
-    for (HColumn<Long,?> itemIDColumn : query.execute().get().getColumns()) {
-      itemIDs.add(itemIDColumn.getName());
-    }
-    return itemIDs.iterator();
-  }
-
-  @Override
-  public PreferenceArray getPreferencesForItem(long itemID) throws 
TasteException {
-    return itemCache.get(itemID);
-  }
-
-  @Override
-  public Float getPreferenceValue(long userID, long itemID) {
-    ColumnQuery<Long,Long,Float> query =
-        HFactory.createColumnQuery(keyspace, LongSerializer.get(), 
LongSerializer.get(), FloatSerializer.get());
-    query.setColumnFamily(USERS_CF);
-    query.setKey(userID);
-    query.setName(itemID);
-    HColumn<Long,Float> column = query.execute().get();
-    return column == null ? null : column.getValue();
-  }
-
-  @Override
-  public Long getPreferenceTime(long userID, long itemID) {
-    ColumnQuery<Long,Long,?> query =
-        HFactory.createColumnQuery(keyspace, LongSerializer.get(), 
LongSerializer.get(), BytesArraySerializer.get());
-    query.setColumnFamily(USERS_CF);
-    query.setKey(userID);
-    query.setName(itemID);
-    HColumn<Long,?> result = query.execute().get();
-    return result == null ? null : result.getClock();
-  }
-
-  @Override
-  public int getNumItems() {
-    Integer itemCount = itemCountCache.get();
-    if (itemCount == null) {
-      CountQuery<Long,Long> countQuery =
-          HFactory.createCountQuery(keyspace, LongSerializer.get(), 
LongSerializer.get());
-      countQuery.setKey(ID_ROW_KEY);
-      countQuery.setColumnFamily(ITEM_IDS_CF);
-      countQuery.setRange(null, null, Integer.MAX_VALUE);
-      itemCount = countQuery.execute().get();
-      itemCountCache.set(itemCount);
-    }
-    return itemCount;
-  }
-
-  @Override
-  public int getNumUsers() {
-    Integer userCount = userCountCache.get();
-    if (userCount == null) {
-      CountQuery<Long,Long> countQuery =
-          HFactory.createCountQuery(keyspace, LongSerializer.get(), 
LongSerializer.get());
-      countQuery.setKey(ID_ROW_KEY);
-      countQuery.setColumnFamily(USER_IDS_CF);
-      countQuery.setRange(null, null, Integer.MAX_VALUE);
-      userCount = countQuery.execute().get();
-      userCountCache.set(userCount);
-    }
-    return userCount;
-  }
-
-  @Override
-  public int getNumUsersWithPreferenceFor(long itemID) throws TasteException {
-    /*
-    CountQuery<Long,Long> query = HFactory.createCountQuery(keyspace, 
LongSerializer.get(), LongSerializer.get());
-    query.setColumnFamily(ITEMS_CF);
-    query.setKey(itemID);
-    query.setRange(null, null, Integer.MAX_VALUE);
-    return query.execute().get();
-     */
-    return userIDsFromItemCache.get(itemID).size();
-  }
-
-  @Override
-  public int getNumUsersWithPreferenceFor(long itemID1, long itemID2) throws 
TasteException {
-    FastIDSet userIDs1 = userIDsFromItemCache.get(itemID1);
-    FastIDSet userIDs2 = userIDsFromItemCache.get(itemID2);
-    return userIDs1.size() < userIDs2.size()
-        ? userIDs2.intersectionSize(userIDs1)
-        : userIDs1.intersectionSize(userIDs2);
-  }
-
-  @Override
-  public void setPreference(long userID, long itemID, float value) {
-
-    if (Float.isNaN(value)) {
-      value = 1.0f;
-    }
-    
-    long now = System.currentTimeMillis();
-
-    Mutator<Long> mutator = HFactory.createMutator(keyspace, 
LongSerializer.get());
-
-    HColumn<Long,Float> itemForUsers = new HColumnImpl<>(LongSerializer.get(), 
FloatSerializer.get());
-    itemForUsers.setName(itemID);
-    itemForUsers.setClock(now);
-    itemForUsers.setValue(value);
-    mutator.addInsertion(userID, USERS_CF, itemForUsers);
-
-    HColumn<Long,Float> userForItems = new HColumnImpl<>(LongSerializer.get(), 
FloatSerializer.get());
-    userForItems.setName(userID);
-    userForItems.setClock(now);
-    userForItems.setValue(value);
-    mutator.addInsertion(itemID, ITEMS_CF, userForItems);
-
-    HColumn<Long,byte[]> userIDs = new HColumnImpl<>(LongSerializer.get(), 
BytesArraySerializer.get());
-    userIDs.setName(userID);
-    userIDs.setClock(now);
-    userIDs.setValue(EMPTY);
-    mutator.addInsertion(ID_ROW_KEY, USER_IDS_CF, userIDs);
-
-    HColumn<Long,byte[]> itemIDs = new HColumnImpl<>(LongSerializer.get(), 
BytesArraySerializer.get());
-    itemIDs.setName(itemID);
-    itemIDs.setClock(now);
-    itemIDs.setValue(EMPTY);
-    mutator.addInsertion(ID_ROW_KEY, ITEM_IDS_CF, itemIDs);
-
-    mutator.execute();
-  }
-
-  @Override
-  public void removePreference(long userID, long itemID) {
-    Mutator<Long> mutator = HFactory.createMutator(keyspace, 
LongSerializer.get());
-    mutator.addDeletion(userID, USERS_CF, itemID, LongSerializer.get());
-    mutator.addDeletion(itemID, ITEMS_CF, userID, LongSerializer.get());
-    mutator.execute();
-    // Not deleting from userIDs, itemIDs though
-  }
-
-  /**
-   * @return true
-   */
-  @Override
-  public boolean hasPreferenceValues() {
-    return true;
-  }
-
-  /**
-   * @return Float#NaN
-   */
-  @Override
-  public float getMaxPreference() {
-    return Float.NaN;
-  }
-
-  /**
-   * @return Float#NaN
-   */
-  @Override
-  public float getMinPreference() {
-    return Float.NaN;
-  }
-
-  @Override
-  public void refresh(Collection<Refreshable> alreadyRefreshed) {
-    userCache.clear();
-    itemCache.clear();
-    userIDsFromItemCache.clear();
-    itemIDsFromUserCache.clear();
-    userCountCache.set(null);
-    itemCountCache.set(null);
-  }
-
-  @Override
-  public String toString() {
-    return "CassandraDataModel[" + keyspace + ']';
-  }
-
-  @Override
-  public void close() {
-    HFactory.shutdownCluster(cluster);
-  }
-
-
-  private SliceQuery<Long,Long,byte[]> buildNoValueSliceQuery(String cf) {
-    SliceQuery<Long,Long,byte[]> query =
-        HFactory.createSliceQuery(keyspace, LongSerializer.get(), 
LongSerializer.get(), BytesArraySerializer.get());
-    query.setColumnFamily(cf);
-    query.setRange(null, null, false, Integer.MAX_VALUE);
-    return query;
-  }
-
-  private SliceQuery<Long,Long,Float> buildValueSliceQuery(String cf) {
-    SliceQuery<Long,Long,Float> query =
-        HFactory.createSliceQuery(keyspace, LongSerializer.get(), 
LongSerializer.get(), FloatSerializer.get());
-    query.setColumnFamily(cf);
-    query.setRange(null, null, false, Integer.MAX_VALUE);
-    return query;
-  }
-
-
-  private static final class OneConsistencyLevelPolicy implements 
ConsistencyLevelPolicy {
-    @Override
-    public HConsistencyLevel get(OperationType op) {
-      return HConsistencyLevel.ONE;
-    }
-
-    @Override
-    public HConsistencyLevel get(OperationType op, String cfName) {
-      return HConsistencyLevel.ONE;
-    }
-  }
-
-  private final class UserPrefArrayRetriever implements Retriever<Long, 
PreferenceArray> {
-    @Override
-    public PreferenceArray get(Long userID) throws TasteException {
-      SliceQuery<Long,Long,Float> query = buildValueSliceQuery(USERS_CF);
-      query.setKey(userID);
-
-      ColumnSlice<Long,Float> result = query.execute().get();
-      if (result == null) {
-        throw new NoSuchUserException(userID);
-      }
-      List<HColumn<Long,Float>> itemIDColumns = result.getColumns();
-      if (itemIDColumns.isEmpty()) {
-        throw new NoSuchUserException(userID);
-      }
-      int size = itemIDColumns.size();
-      PreferenceArray prefs = new GenericUserPreferenceArray(size);
-      prefs.setUserID(0, userID);
-      for (int i = 0; i < size; i++) {
-        HColumn<Long,Float> itemIDColumn = itemIDColumns.get(i);
-        prefs.setItemID(i, itemIDColumn.getName());
-        prefs.setValue(i, itemIDColumn.getValue());
-      }
-      return prefs;
-    }
-  }
-
-  private final class ItemPrefArrayRetriever implements Retriever<Long, 
PreferenceArray> {
-    @Override
-    public PreferenceArray get(Long itemID) throws TasteException {
-      SliceQuery<Long,Long,Float> query = buildValueSliceQuery(ITEMS_CF);
-      query.setKey(itemID);
-      ColumnSlice<Long,Float> result = query.execute().get();
-      if (result == null) {
-        throw new NoSuchItemException(itemID);
-      }
-      List<HColumn<Long,Float>> userIDColumns = result.getColumns();
-      if (userIDColumns.isEmpty()) {
-        throw new NoSuchItemException(itemID);
-      }
-      int size = userIDColumns.size();
-      PreferenceArray prefs = new GenericItemPreferenceArray(size);
-      prefs.setItemID(0, itemID);
-      for (int i = 0; i < size; i++) {
-        HColumn<Long,Float> userIDColumn = userIDColumns.get(i);
-        prefs.setUserID(i, userIDColumn.getName());
-        prefs.setValue(i, userIDColumn.getValue());
-      }
-      return prefs;
-    }
-  }
-
-  private final class UserIDsFromItemRetriever implements Retriever<Long, 
FastIDSet> {
-    @Override
-    public FastIDSet get(Long itemID) throws TasteException {
-      SliceQuery<Long,Long,byte[]> query = buildNoValueSliceQuery(ITEMS_CF);
-      query.setKey(itemID);
-      ColumnSlice<Long,byte[]> result = query.execute().get();
-      if (result == null) {
-        throw new NoSuchItemException(itemID);
-      }
-      List<HColumn<Long,byte[]>> columns = result.getColumns();
-      FastIDSet userIDs = new FastIDSet(columns.size());
-      for (HColumn<Long,?> userIDColumn : columns) {
-        userIDs.add(userIDColumn.getName());
-      }
-      return userIDs;
-    }
-  }
-
-  private final class ItemIDsFromUserRetriever implements Retriever<Long, 
FastIDSet> {
-    @Override
-    public FastIDSet get(Long userID) throws TasteException {
-      SliceQuery<Long,Long,byte[]> query = buildNoValueSliceQuery(USERS_CF);
-      query.setKey(userID);
-      FastIDSet itemIDs = new FastIDSet();
-      ColumnSlice<Long,byte[]> result = query.execute().get();
-      if (result == null) {
-        throw new NoSuchUserException(userID);
-      }
-      List<HColumn<Long,byte[]>> columns = result.getColumns();
-      if (columns.isEmpty()) {
-        throw new NoSuchUserException(userID);
-      }
-      for (HColumn<Long,?> itemIDColumn : columns) {
-        itemIDs.add(itemIDColumn.getName());
-      }
-      return itemIDs;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java
 
b/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java
deleted file mode 100644
index 9735ffe..0000000
--- 
a/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/hbase/HBaseDataModel.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf.taste.impl.model.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTableFactory;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.mahout.cf.taste.common.NoSuchItemException;
-import org.apache.mahout.cf.taste.common.NoSuchUserException;
-import org.apache.mahout.cf.taste.common.Refreshable;
-import org.apache.mahout.cf.taste.common.TasteException;
-import org.apache.mahout.cf.taste.impl.common.FastIDSet;
-import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
-import org.apache.mahout.cf.taste.impl.model.GenericItemPreferenceArray;
-import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray;
-import org.apache.mahout.cf.taste.model.DataModel;
-import org.apache.mahout.cf.taste.model.PreferenceArray;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-
-/**
- * <p>Naive approach of storing one preference as one value in the table.
- * Preferences are indexed as (user, item) and (item, user) for O(1) 
lookups.</p>
- *
- * <p>The default table name is "taste", this can be set through a constructor
- * argument. Each row has a value starting with "i" or "u" followed by the
- * actual id encoded as a big endian long.</p>
- *
- * <p>E.g., "u\x00\x00\x00\x00\x00\x00\x04\xd2" is user 1234L</p>
- *
- * <p>There are two column families: "users" and "items".</p>
- *
- * <p>The "users" column family holds user->item preferences. Each userID is 
the
- * column qualifier and the value is the preference.</p>
- *
- * <p>The "items" column fmaily holds item->user preferences. Each itemID is 
the
- * column qualifier and the value is the preference.</p>
- *
- * <p>User IDs and item IDs are cached in a FastIDSet since it requires a full
- * table scan to build these sets. Preferences are not cached since they
- * are pretty cheap lookups in HBase (also caching the Preferences defeats
- * the purpose of a scalable storage engine like HBase).</p>
- */
-public final class HBaseDataModel implements DataModel, Closeable {
-
-  private static final Logger log = 
LoggerFactory.getLogger(HBaseDataModel.class);
-
-  private static final String DEFAULT_TABLE = "taste";
-  private static final byte[] USERS_CF = Bytes.toBytes("users");
-  private static final byte[] ITEMS_CF = Bytes.toBytes("items");
-
-  private final HTablePool pool;
-  private final String tableName;
-
-  // Cache of user and item ids
-  private volatile FastIDSet itemIDs;
-  private volatile FastIDSet userIDs;
-
-  public HBaseDataModel(String zkConnect) throws IOException {
-    this(zkConnect, DEFAULT_TABLE);
-  }
-
-  public HBaseDataModel(String zkConnect, String tableName) throws IOException 
{
-    log.info("Using HBase table {}", tableName);
-    Configuration conf = HBaseConfiguration.create();
-    conf.set("hbase.zookeeper.quorum", zkConnect);
-    HTableFactory tableFactory = new HTableFactory();
-    this.pool = new HTablePool(conf, 8, tableFactory);
-    this.tableName = tableName;
-
-    bootstrap(conf);
-    // Warm the cache
-    refresh(null);
-  }
-
-  public HBaseDataModel(HTablePool pool, String tableName, Configuration conf) 
throws IOException {
-    log.info("Using HBase table {}", tableName);
-    this.pool = pool;
-    this.tableName = tableName;
-
-    bootstrap(conf);
-
-    // Warm the cache
-    refresh(null);
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * Create the table if it doesn't exist
-   */
-  private void bootstrap(Configuration conf) throws IOException {
-    HTableDescriptor tDesc = new HTableDescriptor(Bytes.toBytes(tableName));
-    tDesc.addFamily(new HColumnDescriptor(USERS_CF));
-    tDesc.addFamily(new HColumnDescriptor(ITEMS_CF));
-    try (HBaseAdmin admin = new HBaseAdmin(conf)) {
-      admin.createTable(tDesc);
-      log.info("Created table {}", tableName);
-    }
-  }
-
-  /**
-   * Prefix a user id with "u" and convert to byte[]
-   */
-  private static byte[] userToBytes(long userID) {
-    ByteBuffer bb = ByteBuffer.allocate(9);
-    bb.put((byte)0x75); // The letter "u"
-    bb.putLong(userID);
-    return bb.array();
-  }
-
-  /**
-   * Prefix an item id with "i" and convert to byte[]
-   */
-  private static byte[] itemToBytes(long itemID) {
-    ByteBuffer bb = ByteBuffer.allocate(9);
-    bb.put((byte)0x69); // The letter "i"
-    bb.putLong(itemID);
-    return bb.array();
-  }
-
-  /**
-   * Extract the id out of a prefix byte[] id
-   */
-  private static long bytesToUserOrItemID(byte[] ba) {
-    ByteBuffer bb = ByteBuffer.wrap(ba);
-    return bb.getLong(1);
-  }
-
-  /* DataModel interface */
-
-  @Override
-  public LongPrimitiveIterator getUserIDs() {
-    return userIDs.iterator();
-  }
-
-  @Override
-  public PreferenceArray getPreferencesFromUser(long userID) throws 
TasteException {
-    Result result;
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      Get get = new Get(userToBytes(userID));
-      get.addFamily(ITEMS_CF);
-      result = table.get(get);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to retrieve user preferences from 
HBase", e);
-    }
-
-    if (result.isEmpty()) {
-      throw new NoSuchUserException(userID);
-    }
-
-    SortedMap<byte[], byte[]> families = result.getFamilyMap(ITEMS_CF);
-    PreferenceArray prefs = new GenericUserPreferenceArray(families.size());
-    prefs.setUserID(0, userID);
-    int i = 0;
-    for (Map.Entry<byte[], byte[]> entry : families.entrySet()) {
-      prefs.setItemID(i, Bytes.toLong(entry.getKey()));
-      prefs.setValue(i, Bytes.toFloat(entry.getValue()));
-      i++;
-    }
-    return prefs;
-  }
-
-  @Override
-  public FastIDSet getItemIDsFromUser(long userID) throws TasteException {
-    Result result;
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      Get get = new Get(userToBytes(userID));
-      get.addFamily(ITEMS_CF);
-      result = table.get(get);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to retrieve item IDs from HBase", e);
-    }
-
-    if (result.isEmpty()) {
-      throw new NoSuchUserException(userID);
-    }
-
-    SortedMap<byte[],byte[]> families = result.getFamilyMap(ITEMS_CF);
-    FastIDSet ids = new FastIDSet(families.size());
-    for (byte[] family : families.keySet()) {
-      ids.add(Bytes.toLong(family));
-    }
-    return ids;
-  }
-
-  @Override
-  public LongPrimitiveIterator getItemIDs() {
-    return itemIDs.iterator();
-  }
-
-  @Override
-  public PreferenceArray getPreferencesForItem(long itemID) throws 
TasteException {
-    Result result;
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      Get get = new Get(itemToBytes(itemID));
-      get.addFamily(USERS_CF);
-      result = table.get(get);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to retrieve item preferences from 
HBase", e);
-    }
-
-    if (result.isEmpty()) {
-      throw new NoSuchItemException(itemID);
-    }
-
-    SortedMap<byte[], byte[]> families = result.getFamilyMap(USERS_CF);
-    PreferenceArray prefs = new GenericItemPreferenceArray(families.size());
-    prefs.setItemID(0, itemID);
-    int i = 0;
-    for (Map.Entry<byte[], byte[]> entry : families.entrySet()) {
-      prefs.setUserID(i, Bytes.toLong(entry.getKey()));
-      prefs.setValue(i, Bytes.toFloat(entry.getValue()));
-      i++;
-    }
-    return prefs;
-  }
-
-  @Override
-  public Float getPreferenceValue(long userID, long itemID) throws 
TasteException {
-    Result result;
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      Get get = new Get(userToBytes(userID));
-      get.addColumn(ITEMS_CF, Bytes.toBytes(itemID));
-      result = table.get(get);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to retrieve user preferences from 
HBase", e);
-    }
-
-    if (result.isEmpty()) {
-      throw new NoSuchUserException(userID);
-    }
-
-    if (result.containsColumn(ITEMS_CF, Bytes.toBytes(itemID))) {
-      return Bytes.toFloat(result.getValue(ITEMS_CF, Bytes.toBytes(itemID)));
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public Long getPreferenceTime(long userID, long itemID) throws 
TasteException {
-    Result result;
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      Get get = new Get(userToBytes(userID));
-      get.addColumn(ITEMS_CF, Bytes.toBytes(itemID));
-      result = table.get(get);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to retrieve user preferences from 
HBase", e);
-    }
-
-    if (result.isEmpty()) {
-      throw new NoSuchUserException(userID);
-    }
-
-    if (result.containsColumn(ITEMS_CF, Bytes.toBytes(itemID))) {
-      KeyValue kv = result.getColumnLatest(ITEMS_CF, Bytes.toBytes(itemID));
-      return kv.getTimestamp();
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public int getNumItems() {
-    return itemIDs.size();
-  }
-
-  @Override
-  public int getNumUsers() {
-    return userIDs.size();
-  }
-
-  @Override
-  public int getNumUsersWithPreferenceFor(long itemID) throws TasteException {
-    PreferenceArray prefs = getPreferencesForItem(itemID);
-    return prefs.length();
-  }
-
-  @Override
-  public int getNumUsersWithPreferenceFor(long itemID1, long itemID2) throws 
TasteException {
-    Result[] results;
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      List<Get> gets = new ArrayList<>(2);
-      gets.add(new Get(itemToBytes(itemID1)));
-      gets.add(new Get(itemToBytes(itemID2)));
-      gets.get(0).addFamily(USERS_CF);
-      gets.get(1).addFamily(USERS_CF);
-      results = table.get(gets);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to retrieve item preferences from 
HBase", e);
-    }
-
-    if (results[0].isEmpty()) {
-      throw new NoSuchItemException(itemID1);
-    }
-    if (results[1].isEmpty()) {
-      throw new NoSuchItemException(itemID2);
-    }
-
-    // First item
-    Result result = results[0];
-    SortedMap<byte[], byte[]> families = result.getFamilyMap(USERS_CF);
-    FastIDSet idSet1 = new FastIDSet(families.size());
-    for (byte[] id : families.keySet()) {
-      idSet1.add(Bytes.toLong(id));
-    }
-
-    // Second item
-    result = results[1];
-    families = result.getFamilyMap(USERS_CF);
-    FastIDSet idSet2 = new FastIDSet(families.size());
-    for (byte[] id : families.keySet()) {
-      idSet2.add(Bytes.toLong(id));
-    }
-
-    return idSet1.intersectionSize(idSet2);
-  }
-
-  @Override
-  public void setPreference(long userID, long itemID, float value) throws 
TasteException {
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      List<Put> puts = new ArrayList<>(2);
-      puts.add(new Put(userToBytes(userID)));
-      puts.add(new Put(itemToBytes(itemID)));
-      puts.get(0).add(ITEMS_CF, Bytes.toBytes(itemID), Bytes.toBytes(value));
-      puts.get(1).add(USERS_CF, Bytes.toBytes(userID), Bytes.toBytes(value));
-      table.put(puts);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to store preference in HBase", e);
-    }
-  }
-
-  @Override
-  public void removePreference(long userID, long itemID) throws TasteException 
{
-    try {
-      HTableInterface table = pool.getTable(tableName);
-      List<Delete> deletes = new ArrayList<>(2);
-      deletes.add(new Delete(userToBytes(userID)));
-      deletes.add(new Delete(itemToBytes(itemID)));
-      deletes.get(0).deleteColumns(ITEMS_CF, Bytes.toBytes(itemID));
-      deletes.get(1).deleteColumns(USERS_CF, Bytes.toBytes(userID));
-      table.delete(deletes);
-      table.close();
-    } catch (IOException e) {
-      throw new TasteException("Failed to remove preference from HBase", e);
-    }
-  }
-
-  @Override
-  public boolean hasPreferenceValues() {
-    return true;
-  }
-
-  @Override
-  public float getMaxPreference() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public float getMinPreference() {
-    throw new UnsupportedOperationException();
-  }
-
-  /* Closeable interface */
-
-  @Override
-  public void close() throws IOException {
-    pool.close();
-  }
-
-  /* Refreshable interface */
-
-  @Override
-  public void refresh(Collection<Refreshable> alreadyRefreshed) {
-    if (alreadyRefreshed == null || !alreadyRefreshed.contains(this)) {
-      try {
-        log.info("Refreshing item and user ID caches");
-        long t1 = System.currentTimeMillis();
-        refreshItemIDs();
-        refreshUserIDs();
-        long t2 = System.currentTimeMillis();
-        log.info("Finished refreshing caches in {} ms", t2 - t1);
-      } catch (IOException e) {
-        throw new IllegalStateException("Could not reload DataModel", e);
-      }
-    }
-  }
-
-  /*
-   * Refresh the item id cache. Warning: this does a large table scan
-   */
-  private synchronized void refreshItemIDs() throws IOException {
-    // Get the list of item ids
-    HTableInterface table = pool.getTable(tableName);
-    Scan scan = new Scan(new byte[]{0x69}, new byte[]{0x70});
-    scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, new 
KeyOnlyFilter(), new FirstKeyOnlyFilter()));
-    ResultScanner scanner = table.getScanner(scan);
-    Collection<Long> ids = new LinkedList<>();
-    for (Result result : scanner) {
-      ids.add(bytesToUserOrItemID(result.getRow()));
-    }
-    table.close();
-
-    // Copy into FastIDSet
-    FastIDSet itemIDs = new FastIDSet(ids.size());
-    for (long l : ids) {
-      itemIDs.add(l);
-    }
-
-    // Swap with the active
-    this.itemIDs = itemIDs;
-  }
-
-  /*
-   * Refresh the user id cache. Warning: this does a large table scan
-   */
-  private synchronized void refreshUserIDs() throws IOException {
-    // Get the list of user ids
-    HTableInterface table = pool.getTable(tableName);
-    Scan scan = new Scan(new byte[]{0x75}, new byte[]{0x76});
-    scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, new 
KeyOnlyFilter(), new FirstKeyOnlyFilter()));
-    ResultScanner scanner = table.getScanner(scan);
-    Collection<Long> ids = new LinkedList<>();
-    for (Result result : scanner) {
-      ids.add(bytesToUserOrItemID(result.getRow()));
-    }
-    table.close();
-
-    // Copy into FastIDSet
-    FastIDSet userIDs = new FastIDSet(ids.size());
-    for (long l : ids) {
-      userIDs.add(l);
-    }
-
-    // Swap with the active
-    this.userIDs = userIDs;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/jdbc/AbstractBooleanPrefJDBCDataModel.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/jdbc/AbstractBooleanPrefJDBCDataModel.java
 
b/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/jdbc/AbstractBooleanPrefJDBCDataModel.java
deleted file mode 100644
index 79ca1ac..0000000
--- 
a/integration/src/main/java/org/apache/mahout/cf/taste/impl/model/jdbc/AbstractBooleanPrefJDBCDataModel.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.cf.taste.impl.model.jdbc;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import javax.sql.DataSource;
-
-import org.apache.mahout.cf.taste.common.TasteException;
-import org.apache.mahout.cf.taste.impl.model.BooleanPreference;
-import org.apache.mahout.cf.taste.model.Preference;
-import org.apache.mahout.common.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public abstract class AbstractBooleanPrefJDBCDataModel extends 
AbstractJDBCDataModel {
-  
-  private static final Logger log = 
LoggerFactory.getLogger(AbstractBooleanPrefJDBCDataModel.class);
-
-  static final String NO_SUCH_COLUMN = "NO_SUCH_COLUMN";
-
-  private final String setPreferenceSQL;
-  
-  protected AbstractBooleanPrefJDBCDataModel(DataSource dataSource,
-                                             String preferenceTable,
-                                             String userIDColumn,
-                                             String itemIDColumn,
-                                             String preferenceColumn,
-                                             String getPreferenceSQL,
-                                             String getPreferenceTimeSQL,
-                                             String getUserSQL,
-                                             String getAllUsersSQL,
-                                             String getNumItemsSQL,
-                                             String getNumUsersSQL,
-                                             String setPreferenceSQL,
-                                             String removePreferenceSQL,
-                                             String getUsersSQL,
-                                             String getItemsSQL,
-                                             String getPrefsForItemSQL,
-                                             String getNumPreferenceForItemSQL,
-                                             String 
getNumPreferenceForItemsSQL,
-                                             String getMaxPreferenceSQL,
-                                             String getMinPreferenceSQL) {
-    super(dataSource,
-          preferenceTable,
-          userIDColumn,
-          itemIDColumn,
-          preferenceColumn,
-          getPreferenceSQL,
-          getPreferenceTimeSQL,
-          getUserSQL,
-          getAllUsersSQL,
-          getNumItemsSQL,
-          getNumUsersSQL,
-          setPreferenceSQL,
-          removePreferenceSQL,
-          getUsersSQL,
-          getItemsSQL,
-          getPrefsForItemSQL,
-          getNumPreferenceForItemSQL,
-          getNumPreferenceForItemsSQL,
-          getMaxPreferenceSQL,
-          getMinPreferenceSQL);
-    this.setPreferenceSQL = setPreferenceSQL;
-  }
-  
-  @Override
-  protected Preference buildPreference(ResultSet rs) throws SQLException {
-    return new BooleanPreference(getLongColumn(rs, 1), getLongColumn(rs, 2));
-  }
-
-  @Override
-  String getSetPreferenceSQL() {
-    return setPreferenceSQL;
-  }
-  
-  @Override
-  public void setPreference(long userID, long itemID, float value) throws 
TasteException {
-    Preconditions.checkArgument(!Float.isNaN(value), "NaN value");
-    log.debug("Setting preference for user {}, item {}", userID, itemID);
-    
-    Connection conn = null;
-    PreparedStatement stmt = null;
-    
-    try {
-      conn = getDataSource().getConnection();
-      stmt = conn.prepareStatement(setPreferenceSQL);
-      setLongParameter(stmt, 1, userID);
-      setLongParameter(stmt, 2, itemID);
-      
-      log.debug("Executing SQL update: {}", setPreferenceSQL);
-      stmt.executeUpdate();
-      
-    } catch (SQLException sqle) {
-      log.warn("Exception while setting preference", sqle);
-      throw new TasteException(sqle);
-    } finally {
-      IOUtils.quietClose(null, stmt, conn);
-    }
-  }
-
-  @Override
-  public boolean hasPreferenceValues() {
-    return false;
-  }
-
-  @Override
-  public float getMaxPreference() {
-    return 1.0f;
-  }
-
-  @Override
-  public float getMinPreference() {
-    return 1.0f;
-  }
-  
-}

Reply via email to