[
https://issues.apache.org/jira/browse/FLINK-2073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559466#comment-14559466
]
ASF GitHub Bot commented on FLINK-2073:
---------------------------------------
Github user thvasilo commented on a diff in the pull request:
https://github.com/apache/flink/pull/727#discussion_r31058920
--- Diff: docs/libs/ml/contribution_guide.md ---
@@ -20,7 +21,329 @@ specific language governing permissions and limitations
under the License.
-->
+The Flink community highly appreciates all sorts of contributions to
FlinkML.
+FlinkML offers people interested in machine learning to work on a highly
active open source project which makes scalable ML reality.
+The following document describes how to contribute to FlinkML.
+
* This will be replaced by the TOC
{:toc}
-Coming soon. In the meantime, check our list of [open issues on
JIRA](https://issues.apache.org/jira/browse/FLINK-1748?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC)
+## Getting Started
+
+In order to get started first read Flink's [contribution
guide](http://flink.apache.org/how-to-contribute.html).
+Everything from this guide also applies to FlinkML.
+
+## Pick a Topic
+
+If you are looking for some new ideas, then you should check out the list
of [unresolved issues on
JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC).
+Once you decide to contribute to one of these issues, you should take
ownership of it and track your progress with this issue.
+That way, the other contributors know the state of the different issues
and redundant work is avoided.
+
+If you already know what you want to contribute to FlinkML all the better.
+It is still advisable to create a JIRA issue for your idea to tell the
Flink community what you want to do, though.
+
+## Testing
+
+New contributions should come with tests to verify the correct behavior of
the algorithm.
+The tests help to maintain the algorithm's correctness throughout code
changes, e.g. refactorings.
+
+We distinguish between unit tests, which are executed during maven's test
phase, and integration tests, which are executed during maven's verify phase.
+Maven automatically makes this distinction by using the following naming
rules:
+All test cases whose class name ends with a suffix fulfilling the regular
expression `(IT|Integration)(Test|Suite|Case)`, are considered integration
tests.
+The rest are considered unit tests and should only test behavior which is
local to the component under test.
+
+An integration test is a test which requires the full Flink system to be
started.
+In order to do that properly, all integration test cases have to mix in
the trait `FlinkTestBase`.
+This trait will set the right `ExecutionEnvironment` so that the test will
be executed on a special `FlinkMiniCluster` designated for testing purposes.
+Thus, an integration test could look the following:
+
+{% highlight scala %}
+class ExampleITSuite extends FlatSpec with FlinkTestBase {
+ behavior of "An example algorithm"
+
+ it should "do something" in {
+ ...
+ }
+}
+{% endhighlight %}
+
+The test style does not have to be `FlatSpec` but can be any other
scalatest `Suite` subclass.
+
+## Documentation
+
+When contributing new algorithms, it is required to add code comments
describing the functioning of the algorithm and its parameters with which the
user can control its behavior.
+Additionally, we would like to encourage contributors to add this
information to the online documentation.
+The online documentation for FlinkML's components can be found in the
directory `docs/libs/ml`.
+
+Every new algorithm is described by a single markdown file.
+This file should contain at least the following points:
+
+1. What does the algorithm do
+2. How does the algorithm work (or reference to description)
+3. Parameter description with default values
+4. Code snippet showing how the algorithm is used
+
+In order to use latex syntax in the markdown file, you have to include
`mathjax: include` in the YAML front matter.
+
+{% highlight java %}
+---
+mathjax: include
+title: Example title
+---
+{% endhighlight %}
+
+In order to use displayed mathematics, you have to put your latex code in
`$$ ... $$`.
+For in-line mathematics, use `$ ... $`.
+Additionally some predefined latex commands are included into the scope of
your markdown file.
+See `docs/_include/latex_commands.html` for the complete list of
predefined latex commands.
+
+## Contributing
+
+Once you have implemented the algorithm with adequate test coverage and
added documentation, you are ready to open a pull request.
+Details of how to open a pull request can be found
[here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation).
+
+## How to Implement a Pipeline Operator
+
+FlinkML follows the principle to make machine learning as easy and
accessible as possible.
+Therefore, it supports a flexible pipelining mechanism which allows users
to quickly define their analysis pipelines consisting of a multitude of
different components.
+A pipeline operator is either a `Transformer` or a `Predictor`.
+A `Transformer` can be fitted to training data and transforms data from
one format into another format.
+A scaler which changes the mean and variance of its input data according
to the mean and variance of some training data is an example for a
`Transformer`.
+In contrast, a `Predictor` encapsulates a data model and the corresponding
logic to train it.
+Once a `Predictor` has trained the model, it can be used to make new
predictions.
+A support vector machine which is first trained to obtain the support
vectors and then used to classify data points is an example for a `Predictor`.
+A general description of FlinkML's pipelining can be found
[here]({{site.baseurl}}/libs/ml/pipelines.html).
+In order to support the pipelining, algorithms have to adhere to a certain
design pattern, which we will describe next.
+
+Let's assume that we want to implement a pipeline operator which changes
the mean of your data.
+At first, we have to reflect which type of pipeline operator it is.
+Since centering data is a common preprocessing step in any analysis
pipeline, we will implement it as a `Transformer`.
+Therefore, we first create a `MeanTransformer` class which inherits from
`Transformer`
+
+{% highlight scala %}
+class MeanTransformer extends Transformer[Centering] {}
+{% endhighlight %}
+
+Since we want to be able to configure the mean of the resulting data, we
have to add a configuration parameter.
+
+{% highlight scala %}
+class MeanTransformer extends Transformer[Centering] {
+ def setMean(mean: Double): Mean = {
+ parameters.add(MeanTransformer.Mean, mu)
+ }
+}
+
+object MeanTransformer {
+ case object Mean extends Parameter[Double] {
+ override val defaultValue: Option[Double] = Some(0.0)
+ }
+
+ def apply(): MeanTransformer = new MeanTransformer
+}
+{% endhighlight %}
+
+Parameters are defined in the companion object of the transformer class
and extend the `Parameter` class.
+The default value will be used if no other value has been set by the user
of this component.
+If no default value has been specified, meaning that `defaultValue =
None`, then the algorithm has to handle this situation accordingly.
+
+We can now instantiate a `MeanTransformer` object and set the mean value
of the transformed data.
+But we still have to implement how the transformation works.
+The workflow can be separated into two phases.
+Within the first phase, the transformer learns the mean of the given
training data.
+This knowledge can then be used in the second phase to transform the
provided data with respect to the configured resulting mean value.
+
+The learning of the mean can be implemented within the `fit` operation of
a `Transformer`.
+Within the `fit` operation, a pipeline component is trained with respect
to the given training data.
+The algorithm is, however, **not** implemented by overriding the `fit`
method but by providing an implementation of a corresponding `FitOperation` for
the correct type.
+Taking a look at the definition of the `fit` method in `Estimator`, which
is the parent class of `Transformer`, reveals what why this is the case.
+
+{% highlight scala %}
+trait Estimator[Self] extends WithParameters with Serializable {
+ that: Self =>
+
+ def fit[Training](
+ training: DataSet[Training],
+ fitParameters: ParameterMap = ParameterMap.Empty)
+ (implicit fitOperation: FitOperation[Self, Training]): Unit = {
+ FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
+ fitOperation.fit(this, fitParameters, training)
+ }
+}
+{% endhighlight %}
+
+We see that the `fit` method is called with an input data set of type
`Training`, an optional parameter list and in the second parameter list with an
implicit parameter of type `FitOperation`.
+Within the body of the function, first some machine learning types are
registered and then the `fit` method of the `FitOperation` parameter is called.
+The instance gives itself, the parameter map and the training data set as
a parameters to the method.
+Thus, all the program logic takes place within the `FitOperation`.
+
+The `FitOperation` has two type parameters.
+The first defines the pipeline operator type for which this `FitOperation`
shall work and the second type parameter defines the type of the data set
elements.
+If we first wanted to implement the `MeanTransformer` to work on
`DenseVector`, we would, thus, have to provide an implementation for
`FitOperation[MeanTransformer, DenseVector]`.
+
+{% highlight scala %}
+val denseVectorMeanFitOperation = new FitOperation[MeanTransformer,
DenseVector] {
+ override def fit(instance: MeanTransformer, fitParameters: ParameterMap,
input: DataSet[DenseVector]) : Unit = {
+ import org.apache.flink.ml.math.Breeze._
+ val meanTrainingData: DataSet[DenseVector] = input
+ .map{ x => (x.asBreeze, 1) }
+ .reduce{
+ (left, right) =>
+ (left._1 + right._1, left._2 + right._2)
+ }
+ .map{ p => (p._1/p._2).fromBreeze }
+ }
+}
+{% endhighlight %}
+
+A `FitOperation[T, I]` has a `fit` method which is called with an instance
of type `T`, a parameter map and an input `DataSet[I]`.
+In our case `T=MeanTransformer` and `I=DenseVector`.
+The parameter map is necessary if our fit step depends on some parameter
values which were not given directly at creation time of the `Transformer`.
+The `FitOperation` of the `MeanTransformer` sums the `DenseVector`
instances of the given input data set up and divides the result by the total
number of vectors.
+That way, we obtain a `DataSet[DenseVector]` with a single element which
is the mean value.
+
+But if we look closely at the implementation, we see that the result of
the mean computation is never stored anywhere.
+If we want to use this knowledge in a later step to adjust the mean of
some other input, we have to keep it around.
+And here is where the parameter of type `MeanTransformer` which is given
to the `fit` method comes into play.
+We can use this instance to store state, which is used by a subsequent
`transform` operation which works on the same object.
+But first we have to extend `MeanTransformer` by a member field and then
adjust the `FitOperation` implementation.
+
+{% highlight scala %}
+class MeanTransformer extends Transformer[Centering] {
+ var meanOption: Option[DataSet[DenseVector]] = None
+
+ def setMean(mean: Double): Mean = {
+ parameters.add(MeanTransformer.Mean, mu)
+ }
+}
+
+val denseVectorMeanFitOperation = new FitOperation[MeanTransformer,
DenseVector] {
+ override def fit(instance: MeanTransformer, fitParameters: ParameterMap,
input: DataSet[DenseVector]) : Unit = {
+ import org.apache.flink.ml.math.Breeze._
+
+ instance.meanOption = Some(input
+ .map{ x => (x.asBreeze, 1) }
+ .reduce{
+ (left, right) =>
+ (left._1 + right._1, left._2 + right._2)
+ }
+ .map{ p => (p._1/p._2).fromBreeze })
+ }
+}
+{% endhighlight %}
+
+If we look at the `transform` method in `Transformer`, we will see that we
also need an implementation of `TransformOperation`.
+A possible mean transforming implementation could look the following.
+
+{% highlight scala %}
+
+val denseVectorMeanTransformOperation = new
TransformOperation[MeanTransformer, DenseVector, DenseVector] {
+ override def transform(
+ instance: MeanTransformer,
+ transformParameters: ParameterMap,
+ input: DataSet[DenseVector])
+ : DataSet[DenseVector] = {
+ val resultingParameters = parameters ++ transformParameters
+
+ val resultingMean = resultingParameters(MeanTransformer.Mean)
+
+ instance.meanOption match {
+ case Some(trainingMean) => {
+ input.map{ new MeanTransformMapper(resultingMean)
}.withBroadcastSet(trainingMean, "trainingMean")
+ }
+ case None => throw new RuntimeException("MeanTransformer has not
been fitted to data.")
+ }
+ }
+}
+
+class MeanTransformMapper(resultingMean: Double) extends
RichMapFunction[DenseVector, DenseVector] {
+ var trainingMean: DenseVector = null
+
+ override def open(parameters: Configuration): Unit = {
+ trainingMean =
getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0)
+ }
+
+ override def map(vector: DenseVector): DenseVector = {
+ import org.apache.flink.ml.math.Breeze._
+
+ val result = vector.asBreeze - trainingMean.asBreeze + resultingMean
+
+ result.fromBreeze
+ }
+}
+{% endhighlight %}
+
+Now we have everything implemented to fit our `MeanTransformer` to a
training data set of `DenseVector` instances and to transform them.
+However, when we execute the `fit` operation
+
+{% highlight scala %}
+val trainingData: DataSet[DenseVector] = ...
+val meanTransformer = MeanTransformer()
+
+meanTransformer.fit(trainingData)
+{% endhighlight %}
+
+we receive the following error at runtime: `"There is no FitOperation
defined for class MeanTransformer which trains on a
DataSet[org.apache.flink.ml.math.DenseVector]"`.
+The reason is that the Scala compiler could not find a fitting
`FitOperation` value with the right type parameters for the implicit parameter
of the `fit` method.
--- End diff --
Is it the compiler that detects this?
> Add contribution guide for FlinkML
> ----------------------------------
>
> Key: FLINK-2073
> URL: https://issues.apache.org/jira/browse/FLINK-2073
> Project: Flink
> Issue Type: New Feature
> Components: Documentation, Machine Learning Library
> Reporter: Theodore Vasiloudis
> Assignee: Till Rohrmann
> Fix For: 0.9
>
>
> We need a guide for contributions to FlinkML in order to encourage the
> extension of the library, and provide guidelines for developers.
> One thing that should be included is a step-by-step guide to create a
> transformer, or other Estimator
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)