Github user thvasilo commented on a diff in the pull request:
https://github.com/apache/flink/pull/727#discussion_r31058920
--- Diff: docs/libs/ml/contribution_guide.md ---
@@ -20,7 +21,329 @@ specific language governing permissions and limitations
under the License.
-->
+The Flink community highly appreciates all sorts of contributions to
FlinkML.
+FlinkML offers people interested in machine learning to work on a highly
active open source project which makes scalable ML reality.
+The following document describes how to contribute to FlinkML.
+
* This will be replaced by the TOC
{:toc}
-Coming soon. In the meantime, check our list of [open issues on
JIRA](https://issues.apache.org/jira/browse/FLINK-1748?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC)
+## Getting Started
+
+In order to get started first read Flink's [contribution
guide](http://flink.apache.org/how-to-contribute.html).
+Everything from this guide also applies to FlinkML.
+
+## Pick a Topic
+
+If you are looking for some new ideas, then you should check out the list
of [unresolved issues on
JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC).
+Once you decide to contribute to one of these issues, you should take
ownership of it and track your progress with this issue.
+That way, the other contributors know the state of the different issues
and redundant work is avoided.
+
+If you already know what you want to contribute to FlinkML all the better.
+It is still advisable to create a JIRA issue for your idea to tell the
Flink community what you want to do, though.
+
+## Testing
+
+New contributions should come with tests to verify the correct behavior of
the algorithm.
+The tests help to maintain the algorithm's correctness throughout code
changes, e.g. refactorings.
+
+We distinguish between unit tests, which are executed during maven's test
phase, and integration tests, which are executed during maven's verify phase.
+Maven automatically makes this distinction by using the following naming
rules:
+All test cases whose class name ends with a suffix fulfilling the regular
expression `(IT|Integration)(Test|Suite|Case)`, are considered integration
tests.
+The rest are considered unit tests and should only test behavior which is
local to the component under test.
+
+An integration test is a test which requires the full Flink system to be
started.
+In order to do that properly, all integration test cases have to mix in
the trait `FlinkTestBase`.
+This trait will set the right `ExecutionEnvironment` so that the test will
be executed on a special `FlinkMiniCluster` designated for testing purposes.
+Thus, an integration test could look the following:
+
+{% highlight scala %}
+class ExampleITSuite extends FlatSpec with FlinkTestBase {
+ behavior of "An example algorithm"
+
+ it should "do something" in {
+ ...
+ }
+}
+{% endhighlight %}
+
+The test style does not have to be `FlatSpec` but can be any other
scalatest `Suite` subclass.
+
+## Documentation
+
+When contributing new algorithms, it is required to add code comments
describing the functioning of the algorithm and its parameters with which the
user can control its behavior.
+Additionally, we would like to encourage contributors to add this
information to the online documentation.
+The online documentation for FlinkML's components can be found in the
directory `docs/libs/ml`.
+
+Every new algorithm is described by a single markdown file.
+This file should contain at least the following points:
+
+1. What does the algorithm do
+2. How does the algorithm work (or reference to description)
+3. Parameter description with default values
+4. Code snippet showing how the algorithm is used
+
+In order to use latex syntax in the markdown file, you have to include
`mathjax: include` in the YAML front matter.
+
+{% highlight java %}
+---
+mathjax: include
+title: Example title
+---
+{% endhighlight %}
+
+In order to use displayed mathematics, you have to put your latex code in
`$$ ... $$`.
+For in-line mathematics, use `$ ... $`.
+Additionally some predefined latex commands are included into the scope of
your markdown file.
+See `docs/_include/latex_commands.html` for the complete list of
predefined latex commands.
+
+## Contributing
+
+Once you have implemented the algorithm with adequate test coverage and
added documentation, you are ready to open a pull request.
+Details of how to open a pull request can be found
[here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation).
+
+## How to Implement a Pipeline Operator
+
+FlinkML follows the principle to make machine learning as easy and
accessible as possible.
+Therefore, it supports a flexible pipelining mechanism which allows users
to quickly define their analysis pipelines consisting of a multitude of
different components.
+A pipeline operator is either a `Transformer` or a `Predictor`.
+A `Transformer` can be fitted to training data and transforms data from
one format into another format.
+A scaler which changes the mean and variance of its input data according
to the mean and variance of some training data is an example for a
`Transformer`.
+In contrast, a `Predictor` encapsulates a data model and the corresponding
logic to train it.
+Once a `Predictor` has trained the model, it can be used to make new
predictions.
+A support vector machine which is first trained to obtain the support
vectors and then used to classify data points is an example for a `Predictor`.
+A general description of FlinkML's pipelining can be found
[here]({{site.baseurl}}/libs/ml/pipelines.html).
+In order to support the pipelining, algorithms have to adhere to a certain
design pattern, which we will describe next.
+
+Let's assume that we want to implement a pipeline operator which changes
the mean of your data.
+At first, we have to reflect which type of pipeline operator it is.
+Since centering data is a common preprocessing step in any analysis
pipeline, we will implement it as a `Transformer`.
+Therefore, we first create a `MeanTransformer` class which inherits from
`Transformer`
+
+{% highlight scala %}
+class MeanTransformer extends Transformer[Centering] {}
+{% endhighlight %}
+
+Since we want to be able to configure the mean of the resulting data, we
have to add a configuration parameter.
+
+{% highlight scala %}
+class MeanTransformer extends Transformer[Centering] {
+ def setMean(mean: Double): Mean = {
+ parameters.add(MeanTransformer.Mean, mu)
+ }
+}
+
+object MeanTransformer {
+ case object Mean extends Parameter[Double] {
+ override val defaultValue: Option[Double] = Some(0.0)
+ }
+
+ def apply(): MeanTransformer = new MeanTransformer
+}
+{% endhighlight %}
+
+Parameters are defined in the companion object of the transformer class
and extend the `Parameter` class.
+The default value will be used if no other value has been set by the user
of this component.
+If no default value has been specified, meaning that `defaultValue =
None`, then the algorithm has to handle this situation accordingly.
+
+We can now instantiate a `MeanTransformer` object and set the mean value
of the transformed data.
+But we still have to implement how the transformation works.
+The workflow can be separated into two phases.
+Within the first phase, the transformer learns the mean of the given
training data.
+This knowledge can then be used in the second phase to transform the
provided data with respect to the configured resulting mean value.
+
+The learning of the mean can be implemented within the `fit` operation of
a `Transformer`.
+Within the `fit` operation, a pipeline component is trained with respect
to the given training data.
+The algorithm is, however, **not** implemented by overriding the `fit`
method but by providing an implementation of a corresponding `FitOperation` for
the correct type.
+Taking a look at the definition of the `fit` method in `Estimator`, which
is the parent class of `Transformer`, reveals what why this is the case.
+
+{% highlight scala %}
+trait Estimator[Self] extends WithParameters with Serializable {
+ that: Self =>
+
+ def fit[Training](
+ training: DataSet[Training],
+ fitParameters: ParameterMap = ParameterMap.Empty)
+ (implicit fitOperation: FitOperation[Self, Training]): Unit = {
+ FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
+ fitOperation.fit(this, fitParameters, training)
+ }
+}
+{% endhighlight %}
+
+We see that the `fit` method is called with an input data set of type
`Training`, an optional parameter list and in the second parameter list with an
implicit parameter of type `FitOperation`.
+Within the body of the function, first some machine learning types are
registered and then the `fit` method of the `FitOperation` parameter is called.
+The instance gives itself, the parameter map and the training data set as
a parameters to the method.
+Thus, all the program logic takes place within the `FitOperation`.
+
+The `FitOperation` has two type parameters.
+The first defines the pipeline operator type for which this `FitOperation`
shall work and the second type parameter defines the type of the data set
elements.
+If we first wanted to implement the `MeanTransformer` to work on
`DenseVector`, we would, thus, have to provide an implementation for
`FitOperation[MeanTransformer, DenseVector]`.
+
+{% highlight scala %}
+val denseVectorMeanFitOperation = new FitOperation[MeanTransformer,
DenseVector] {
+ override def fit(instance: MeanTransformer, fitParameters: ParameterMap,
input: DataSet[DenseVector]) : Unit = {
+ import org.apache.flink.ml.math.Breeze._
+ val meanTrainingData: DataSet[DenseVector] = input
+ .map{ x => (x.asBreeze, 1) }
+ .reduce{
+ (left, right) =>
+ (left._1 + right._1, left._2 + right._2)
+ }
+ .map{ p => (p._1/p._2).fromBreeze }
+ }
+}
+{% endhighlight %}
+
+A `FitOperation[T, I]` has a `fit` method which is called with an instance
of type `T`, a parameter map and an input `DataSet[I]`.
+In our case `T=MeanTransformer` and `I=DenseVector`.
+The parameter map is necessary if our fit step depends on some parameter
values which were not given directly at creation time of the `Transformer`.
+The `FitOperation` of the `MeanTransformer` sums the `DenseVector`
instances of the given input data set up and divides the result by the total
number of vectors.
+That way, we obtain a `DataSet[DenseVector]` with a single element which
is the mean value.
+
+But if we look closely at the implementation, we see that the result of
the mean computation is never stored anywhere.
+If we want to use this knowledge in a later step to adjust the mean of
some other input, we have to keep it around.
+And here is where the parameter of type `MeanTransformer` which is given
to the `fit` method comes into play.
+We can use this instance to store state, which is used by a subsequent
`transform` operation which works on the same object.
+But first we have to extend `MeanTransformer` by a member field and then
adjust the `FitOperation` implementation.
+
+{% highlight scala %}
+class MeanTransformer extends Transformer[Centering] {
+ var meanOption: Option[DataSet[DenseVector]] = None
+
+ def setMean(mean: Double): Mean = {
+ parameters.add(MeanTransformer.Mean, mu)
+ }
+}
+
+val denseVectorMeanFitOperation = new FitOperation[MeanTransformer,
DenseVector] {
+ override def fit(instance: MeanTransformer, fitParameters: ParameterMap,
input: DataSet[DenseVector]) : Unit = {
+ import org.apache.flink.ml.math.Breeze._
+
+ instance.meanOption = Some(input
+ .map{ x => (x.asBreeze, 1) }
+ .reduce{
+ (left, right) =>
+ (left._1 + right._1, left._2 + right._2)
+ }
+ .map{ p => (p._1/p._2).fromBreeze })
+ }
+}
+{% endhighlight %}
+
+If we look at the `transform` method in `Transformer`, we will see that we
also need an implementation of `TransformOperation`.
+A possible mean transforming implementation could look the following.
+
+{% highlight scala %}
+
+val denseVectorMeanTransformOperation = new
TransformOperation[MeanTransformer, DenseVector, DenseVector] {
+ override def transform(
+ instance: MeanTransformer,
+ transformParameters: ParameterMap,
+ input: DataSet[DenseVector])
+ : DataSet[DenseVector] = {
+ val resultingParameters = parameters ++ transformParameters
+
+ val resultingMean = resultingParameters(MeanTransformer.Mean)
+
+ instance.meanOption match {
+ case Some(trainingMean) => {
+ input.map{ new MeanTransformMapper(resultingMean)
}.withBroadcastSet(trainingMean, "trainingMean")
+ }
+ case None => throw new RuntimeException("MeanTransformer has not
been fitted to data.")
+ }
+ }
+}
+
+class MeanTransformMapper(resultingMean: Double) extends
RichMapFunction[DenseVector, DenseVector] {
+ var trainingMean: DenseVector = null
+
+ override def open(parameters: Configuration): Unit = {
+ trainingMean =
getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0)
+ }
+
+ override def map(vector: DenseVector): DenseVector = {
+ import org.apache.flink.ml.math.Breeze._
+
+ val result = vector.asBreeze - trainingMean.asBreeze + resultingMean
+
+ result.fromBreeze
+ }
+}
+{% endhighlight %}
+
+Now we have everything implemented to fit our `MeanTransformer` to a
training data set of `DenseVector` instances and to transform them.
+However, when we execute the `fit` operation
+
+{% highlight scala %}
+val trainingData: DataSet[DenseVector] = ...
+val meanTransformer = MeanTransformer()
+
+meanTransformer.fit(trainingData)
+{% endhighlight %}
+
+we receive the following error at runtime: `"There is no FitOperation
defined for class MeanTransformer which trains on a
DataSet[org.apache.flink.ml.math.DenseVector]"`.
+The reason is that the Scala compiler could not find a fitting
`FitOperation` value with the right type parameters for the implicit parameter
of the `fit` method.
--- End diff --
Is it the compiler that detects this?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---