Hi Flink Community,
we are implementing a KMeans algorithm in the ML part of Flink, but after
recent updates we ran into an issues with the vector to breeze converter.
We are getting the following compile error:
Error:(200, 75) ambiguous implicit values:
both value denseVectorConverter in object BreezeVectorConverter of type =>
org.apache.flink.ml.math.BreezeVectorConverter[org.apache.flink.ml.math.DenseVector]
and value sparseVectorConverter in object BreezeVectorConverter of type =>
org.apache.flink.ml.math.BreezeVectorConverter[org.apache.flink.ml.math.SparseVector]
match expected type org.apache.flink.ml.math.BreezeVectorConverter[T]
.reduce((p1, p2) => (p1._1, (p1._2.asBreeze +
p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
^
The code looks like this:
new FitOperation[KMeans, Vector] {
override def fit(
instance: KMeans,
fitParameters: ParameterMap,
input: DataSet[Vector])
: Unit = {
val resultingParameters = instance.parameters ++ fitParameters
val centroids: DataSet[LabeledVector] =
resultingParameters.get(InitialCentroids).getdon'
val numIterations: Int = resultingParameters.get(NumIterations).get
val finalCentroids = centroids.iterate(numIterations) {
currentCentroids =>
val newCentroids: DataSet[LabeledVector] = input
.map(new
SelectNearestCenterMapper).withBroadcastSet(currentCentroids, CENTROIDS)
.map(x => (x.label, x.vector, 1.0)).withForwardedFields("label->_1;
vector->_2")
.groupBy(x => x._1)
.reduce((p1, p2) => (p1._1, (p1._2.asBreeze +
p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
.withForwardedFields("_1")
.map(x => LabeledVector(x._1, (x._2.asBreeze :/ x._3).fromBreeze))
.withForwardedFields("_1->label")
newCentroids
}
instance.centroids = Some(finalCentroids)
}
}
We are getting this error since the commit
https://github.com/apache/flink/commit/ae446388b91ecc0f08887da19400395b96b32f6c
.
It looks like the change to
flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
introduced this problem.
We are not that experienced with scala and don’t know how to resolve this issue.
You can have a look at the code in the pull request:
https://github.com/apache/flink/pull/700
Thanks in advance,
Florian & Co.