[
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652290#comment-15652290
]
ASF GitHub Bot commented on FLINK-4964:
---------------------------------------
Github user tfournier314 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2740#discussion_r87295380
--- Diff:
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation,
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+ * String Indexer
+ */
+class StringIndexer extends Transformer[StringIndexer] {
+
+ private[preprocessing] var metricsOption: Option[DataSet[(String,
Long)]] = None
+
+
+ def setHandleInvalid(value: String): this.type ={
+ parameters.add( HandleInvalid, value )
+ this
+ }
+
+}
+
+object StringIndexer {
+
+ case object HandleInvalid extends Parameter[String] {
+ val defaultValue: Option[String] = Some( "skip" )
+ }
+
+ // ==================================== Factory methods
==========================================
+
+ def apply(): StringIndexer ={
+ new StringIndexer( )
+ }
+
+ // ====================================== Operations
=============================================
+
+ /**
+ * Trains [[StringIndexer]] by learning the count of each string in the
input DataSet.
+ */
+
+ implicit def fitStringIndexer ={
+ new FitOperation[StringIndexer, String] {
+ def fit(instance: StringIndexer, fitParameters: ParameterMap, input:
DataSet[String]):Unit ={
+ val metrics = extractIndices( input )
+ instance.metricsOption = Some( metrics )
+ }
+ }
+ }
+
+ private def extractIndices(input: DataSet[String]): DataSet[(String,
Long)] = {
+
+ val mapping = input
+ .mapWith( s => (s, 1) )
+ .groupBy( 0 )
+ .reduce( (a, b) => (a._1, a._2 + b._2) )
+ .partitionByRange( 1 )
--- End diff --
Indeed, I need to do a global sort, because mapping is a sorted
DataSet[(String,Long)] of (labels,index), where the most frequent item has
index = 0.
I need to sort a dataSet of (labels,frequency), then zipWithIndex to get
the associated index.
I've just realised that sortPartition() will only sort my partitions
locally, so how can I achieve a global sort this way ?
> FlinkML - Add StringIndexer
> ---------------------------
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
> Issue Type: New Feature
> Reporter: Thomas FOURNIER
> Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)