Repository: mahout Updated Branches: refs/heads/master 616b87c07 -> 4c85d6a48
MAHOUT-1847: drmSampleRows in FlinkEngine doesn't wrap Int Keys when ClassTag is of type Int, this closes apache/mahout#232 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/4c85d6a4 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/4c85d6a4 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/4c85d6a4 Branch: refs/heads/master Commit: 4c85d6a48bcdd5c161f50f85ec1e7d278d1dbae0 Parents: 616b87c Author: smarthi <[email protected]> Authored: Mon May 2 18:50:48 2016 -0400 Committer: smarthi <[email protected]> Committed: Mon May 2 18:50:48 2016 -0400 ---------------------------------------------------------------------- .../org/apache/mahout/flinkbindings/FlinkEngine.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/4c85d6a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index fddb432..b3b72b0 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -365,7 +365,15 @@ object FlinkEngine extends DistributedEngine { implicit val typeInformation = generateTypeInformation[K] val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples) - new CheckpointedFlinkDrm[K](sample) + + val res = if (kTag != ClassTag.Int) { + new CheckpointedFlinkDrm[K](sample) + } + else { + blas.rekeySeqInts(new RowsFlinkDrm[K](sample, ncol = drmX.ncol), computeMap = false)._1 + } + + res.collect } /** Engine-specific all reduce tensor operation. */
