This is an automated email from the ASF dual-hosted git repository.
shaoxuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3050957 [FLINK-12473][ml] Add ML pipeline and MLlib interface
3050957 is described below
commit 305095743ffe0bc39f76c1bda983da7d0df9e003
Author: Gen Luo <[email protected]>
AuthorDate: Fri May 24 17:05:23 2019 +0800
[FLINK-12473][ml] Add ML pipeline and MLlib interface
This closes #8402
---
flink-ml-parent/flink-ml-api/pom.xml | 45 ++++
.../org/apache/flink/ml/api/core/Estimator.java | 47 ++++
.../java/org/apache/flink/ml/api/core/Model.java | 39 +++
.../org/apache/flink/ml/api/core/Pipeline.java | 266 +++++++++++++++++++++
.../apache/flink/ml/api/core/PipelineStage.java | 56 +++++
.../org/apache/flink/ml/api/core/Transformer.java | 42 ++++
.../apache/flink/ml/api/misc/param/ParamInfo.java | 130 ++++++++++
.../flink/ml/api/misc/param/ParamInfoFactory.java | 129 ++++++++++
.../flink/ml/api/misc/param/ParamValidator.java | 39 +++
.../org/apache/flink/ml/api/misc/param/Params.java | 151 ++++++++++++
.../apache/flink/ml/api/misc/param/WithParams.java | 60 +++++
.../flink/ml/util/param/ExtractParamInfosUtil.java | 73 ++++++
.../org/apache/flink/ml/api/core/PipelineTest.java | 181 ++++++++++++++
.../org/apache/flink/ml/api/misc/ParamsTest.java | 72 ++++++
.../ml/util/param/ExtractParamInfosUtilTest.java | 104 ++++++++
flink-ml-parent/pom.xml | 39 +++
pom.xml | 1 +
17 files changed, 1474 insertions(+)
diff --git a/flink-ml-parent/flink-ml-api/pom.xml
b/flink-ml-parent/flink-ml-api/pom.xml
new file mode 100644
index 0000000..f77f068
--- /dev/null
+++ b/flink-ml-parent/flink-ml-api/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-ml-parent</artifactId>
+ <version>1.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-ml-api</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+
<version>${jackson.version}-${flink.shaded.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Estimator.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Estimator.java
new file mode 100644
index 0000000..8e31d94
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Estimator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+
+/**
+ * Estimators are {@link PipelineStage}s responsible for training and
generating machine learning
+ * models.
+ *
+ * <p>The implementations are expected to take an input table as training
samples and generate a
+ * {@link Model} which fits these samples.
+ *
+ * @param <E> class type of the Estimator implementation itself, used by {@link
+ * org.apache.flink.ml.api.misc.param.WithParams}.
+ * @param <M> class type of the {@link Model} this Estimator produces.
+ */
+@PublicEvolving
+public interface Estimator<E extends Estimator<E, M>, M extends Model<M>>
extends PipelineStage<E> {
+
+ /**
+ * Train and produce a {@link Model} which fits the records in the
given {@link Table}.
+ *
+ * @param tEnv the table environment to which the input table is bound.
+ * @param input the table with records to train the Model.
+ * @return a model trained to fit on the given Table.
+ */
+ M fit(TableEnvironment tEnv, Table input);
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Model.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Model.java
new file mode 100644
index 0000000..b52b6a9
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Model.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Table;
+
+/**
+ * A model is an ordinary {@link Transformer} except how it is created. While
ordinary transformers
+ * are defined by specifying the parameters directly, a model is usually
generated by an {@link
+ * Estimator} when {@link
Estimator#fit(org.apache.flink.table.api.TableEnvironment, Table)} is
+ * invoked.
+ *
+ * <p>We separate Model from {@link Transformer} in order to support potential
+ * model specific logic such as linking a Model to the {@link Estimator} from
which the model was
+ * generated.
+ *
+ * @param <M> The class type of the Model implementation itself, used by {@link
+ * org.apache.flink.ml.api.misc.param.WithParams}
+ */
+@PublicEvolving
+public interface Model<M extends Model<M>> extends Transformer<M> {
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
new file mode 100644
index 0000000..c8326c4
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Pipeline.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A pipeline is a linear workflow which chains {@link Estimator}s and {@link
Transformer}s to
+ * execute an algorithm.
+ *
+ * <p>A pipeline itself can either act as an Estimator or a Transformer,
depending on the stages it
+ * includes. More specifically:
+ * <ul>
+ * <li>
+ * If a Pipeline has an {@link Estimator}, one needs to call {@link
Pipeline#fit(TableEnvironment,
+ * Table)} before use the pipeline as a {@link Transformer} . In this case the
Pipeline is an {@link
+ * Estimator} and can produce a Pipeline as a {@link Model}.
+ * </li>
+ * <li>
+ * If a Pipeline has no {@link Estimator}, it is a {@link Transformer} and can
be applied to a Table
+ * directly. In this case, {@link Pipeline#fit(TableEnvironment, Table)} will
simply return the
+ * pipeline itself.
+ * </li>
+ * </ul>
+ *
+ * <p>In addition, a pipeline can also be used as a {@link PipelineStage} in
another pipeline, just
+ * like an ordinary {@link Estimator} or {@link Transformer} as describe above.
+ */
+@PublicEvolving
+public final class Pipeline implements Estimator<Pipeline, Pipeline>,
Transformer<Pipeline>,
+ Model<Pipeline> {
+ private static final long serialVersionUID = 1L;
+ private final List<PipelineStage> stages = new ArrayList<>();
+ private final Params params = new Params();
+
+ private int lastEstimatorIndex = -1;
+
+ public Pipeline() {
+ }
+
+ public Pipeline(String pipelineJson) {
+ this.loadJson(pipelineJson);
+ }
+
+ public Pipeline(List<PipelineStage> stages) {
+ for (PipelineStage s : stages) {
+ appendStage(s);
+ }
+ }
+
+ //is the stage a simple Estimator or pipeline with Estimator
+ private static boolean isStageNeedFit(PipelineStage stage) {
+ return (stage instanceof Pipeline && ((Pipeline)
stage).needFit()) ||
+ (!(stage instanceof Pipeline) && stage instanceof
Estimator);
+ }
+
+ /**
+ * Appends a PipelineStage to the tail of this pipeline. Pipeline is
editable only via this
+ * method. The PipelineStage must be Estimator, Transformer, Model or
Pipeline.
+ *
+ * @param stage the stage to be appended
+ */
+ public Pipeline appendStage(PipelineStage stage) {
+ if (isStageNeedFit(stage)) {
+ lastEstimatorIndex = stages.size();
+ } else if (!(stage instanceof Transformer)) {
+ throw new RuntimeException(
+ "All PipelineStages should be Estimator or
Transformer, got:" +
+ stage.getClass().getSimpleName());
+ }
+ stages.add(stage);
+ return this;
+ }
+
+ /**
+ * Returns a list of all stages in this pipeline in order, the list is
immutable.
+ *
+ * @return an immutable list of all stages in this pipeline in order.
+ */
+ public List<PipelineStage> getStages() {
+ return Collections.unmodifiableList(stages);
+ }
+
+ /**
+ * Check whether the pipeline acts as an {@link Estimator} or not. When
the return value is
+ * true, that means this pipeline contains an {@link Estimator} and
thus users must invoke
+ * {@link #fit(TableEnvironment, Table)} before they can use this
pipeline as a {@link
+ * Transformer}. Otherwise, the pipeline can be used as a {@link
Transformer} directly.
+ *
+ * @return {@code true} if this pipeline has an Estimator, {@code
false} otherwise
+ */
+ public boolean needFit() {
+ return this.getIndexOfLastEstimator() >= 0;
+ }
+
+ public Params getParams() {
+ return params;
+ }
+
+ //find the last Estimator or Pipeline that needs fit in stages, -1
stand for no Estimator in Pipeline
+ private int getIndexOfLastEstimator() {
+ return lastEstimatorIndex;
+ }
+
+ /**
+ * Train the pipeline to fit on the records in the given {@link Table}.
+ *
+ * <p>This method go through all the {@link PipelineStage}s in order
and does the following
+ * on each stage until the last {@link Estimator}(inclusive).
+ *
+ * <ul>
+ * <li>
+ * If a stage is an {@link Estimator}, invoke {@link
Estimator#fit(TableEnvironment, Table)}
+ * with the input table to generate a {@link Model}, transform the the
input table with the
+ * generated {@link Model} to get a result table, then pass the result
table to the next stage
+ * as input.
+ * </li>
+ * <li>
+ * If a stage is a {@link Transformer}, invoke {@link
Transformer#transform(TableEnvironment,
+ * Table)} on the input table to get a result table, and pass the
result table to the next stage
+ * as input.
+ * </li>
+ * </ul>
+ *
+ * <p>After all the {@link Estimator}s are trained to fit their input
tables, a new
+ * pipeline will be created with the same stages in this pipeline,
except that all the
+ * Estimators in the new pipeline are replaced with their corresponding
Models generated in the
+ * above process.
+ *
+ * <p>If there is no {@link Estimator} in the pipeline, the method
returns a copy of this
+ * pipeline.
+ *
+ * @param tEnv the table environment to which the input table is bound.
+ * @param input the table with records to train the Pipeline.
+ * @return a pipeline with same stages as this Pipeline except all
Estimators replaced with
+ * their corresponding Models.
+ */
+ @Override
+ public Pipeline fit(TableEnvironment tEnv, Table input) {
+ List<PipelineStage> transformStages = new
ArrayList<>(stages.size());
+ int lastEstimatorIdx = getIndexOfLastEstimator();
+ for (int i = 0; i < stages.size(); i++) {
+ PipelineStage s = stages.get(i);
+ if (i <= lastEstimatorIdx) {
+ Transformer t;
+ boolean needFit = isStageNeedFit(s);
+ if (needFit) {
+ t = ((Estimator) s).fit(tEnv, input);
+ } else {
+ // stage is Transformer, guaranteed in
appendStage() method
+ t = (Transformer) s;
+ }
+ transformStages.add(t);
+ input = t.transform(tEnv, input);
+ } else {
+ transformStages.add(s);
+ }
+ }
+ return new Pipeline(transformStages);
+ }
+
+ /**
+ * Generate a result table by applying all the stages in this pipeline
to the input table in
+ * order.
+ *
+ * @param tEnv the table environment to which the input table is bound.
+ * @param input the table to be transformed
+ * @return a result table with all the stages applied to the input
tables in order.
+ */
+ @Override
+ public Table transform(TableEnvironment tEnv, Table input) {
+ if (needFit()) {
+ throw new RuntimeException("Pipeline contains
Estimator, need to fit first.");
+ }
+ for (PipelineStage s : stages) {
+ input = ((Transformer) s).transform(tEnv, input);
+ }
+ return input;
+ }
+
+ @Override
+ public String toJson() {
+ ObjectMapper mapper = new ObjectMapper();
+
+ List<Map<String, String>> stageJsons = new ArrayList<>();
+ for (PipelineStage s : getStages()) {
+ Map<String, String> stageMap = new HashMap<>();
+ stageMap.put("stageClassName",
s.getClass().getTypeName());
+ stageMap.put("stageJson", s.toJson());
+ stageJsons.add(stageMap);
+ }
+
+ try {
+ return mapper.writeValueAsString(stageJsons);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize
pipeline", e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void loadJson(String json) {
+ ObjectMapper mapper = new ObjectMapper();
+ List<Map<String, String>> stageJsons;
+ try {
+ stageJsons = mapper.readValue(json, List.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to deserialize
pipeline json:" + json, e);
+ }
+ for (Map<String, String> stageMap : stageJsons) {
+ appendStage(restoreInnerStage(stageMap));
+ }
+ }
+
+ private PipelineStage<?> restoreInnerStage(Map<String, String>
stageMap) {
+ String className = stageMap.get("stageClassName");
+ Class<?> clz;
+ try {
+ clz = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("PipelineStage class " +
className + " not exists", e);
+ }
+ InstantiationUtil.checkForInstantiation(clz);
+
+ PipelineStage<?> s;
+ try {
+ s = (PipelineStage<?>) clz.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Class is instantiable but
failed to new an instance", e);
+ }
+
+ String stageJson = stageMap.get("stageJson");
+ s.loadJson(stageJson);
+ return s;
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
new file mode 100644
index 0000000..86bf0d3
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.util.param.ExtractParamInfosUtil;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for a stage in a pipeline. The interface is only a concept, and
does not have any
+ * actual functionality. Its subclasses must be either Estimator or
Transformer. No other classes
+ * should inherit this interface directly.
+ *
+ * <p>Each pipeline stage is with parameters, and requires a public empty
constructor for
+ * restoration in Pipeline.
+ *
+ * @param <T> The class type of the PipelineStage implementation itself, used
by {@link
+ * org.apache.flink.ml.api.misc.param.WithParams}
+ * @see WithParams
+ */
+interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>,
Serializable {
+
+ default String toJson() {
+ return getParams().toJson();
+ }
+
+ default void loadJson(String json) {
+ List<ParamInfo> paramInfos =
ExtractParamInfosUtil.extractParamInfos(this);
+ Map<String, Class<?>> classMap = new HashMap<>();
+ for (ParamInfo i : paramInfos) {
+ classMap.put(i.getName(), i.getValueClass());
+ }
+ getParams().loadJson(json, classMap);
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Transformer.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Transformer.java
new file mode 100644
index 0000000..9435a61
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/Transformer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+
+/**
+ * A transformer is a {@link PipelineStage} that transforms an input {@link
Table} to a result
+ * {@link Table}.
+ *
+ * @param <T> The class type of the Transformer implementation itself, used by
{@link
+ * org.apache.flink.ml.api.misc.param.WithParams}
+ */
+@PublicEvolving
+public interface Transformer<T extends Transformer<T>> extends
PipelineStage<T> {
+ /**
+ * Applies the transformer on the input table, and returns the result
table.
+ *
+ * @param tEnv the table environment to which the input table is bound.
+ * @param input the table to be transformed
+ * @return the transformed table
+ */
+ Table transform(TableEnvironment tEnv, Table input);
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfo.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfo.java
new file mode 100644
index 0000000..994576f
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfo.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Definition of a parameter, including name, type, default value, validator
and so on.
+ *
+ * <p>This class is provided to unify the interaction with parameters.
+ *
+ * @param <V> the type of the param value
+ */
+@PublicEvolving
+public class ParamInfo<V> {
+ private final String name;
+ private final String[] alias;
+ private final String description;
+ private final boolean isOptional;
+ private final boolean hasDefaultValue;
+ private final V defaultValue;
+ private final ParamValidator<V> validator;
+ private final Class<V> valueClass;
+
+ ParamInfo(String name, String[] alias, String description, boolean
isOptional,
+ boolean hasDefaultValue, V defaultValue,
+ ParamValidator<V> validator, Class<V> valueClass) {
+ this.name = name;
+ this.alias = alias;
+ this.description = description;
+ this.isOptional = isOptional;
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ this.validator = validator;
+ this.valueClass = valueClass;
+ }
+
+ /**
+ * Returns the name of the parameter. The name must be unique in the
stage the ParamInfo
+ * belongs to.
+ *
+ * @return the name of the parameter
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the aliases of the parameter. The alias will be an empty
string array by default.
+ *
+ * @return the aliases of the parameter
+ */
+ public String[] getAlias() {
+ return alias;
+ }
+
+ /**
+ * Returns the description of the parameter.
+ *
+ * @return the description of the parameter
+ */
+ public String getDescription() {
+ return description;
+ }
+
+ /**
+ * Returns whether the parameter is optional.
+ *
+ * @return {@code true} if the param is optional, {@code false}
otherwise
+ */
+ public boolean isOptional() {
+ return isOptional;
+ }
+
+ /**
+ * Returns whether the parameter has a default value. Since {@code
null} may also be a valid
+ * default value of a parameter, the return of getDefaultValue may be
{@code null} even when
+ * this method returns true.
+ *
+ * @return {@code true} if the param is has a default value(even if
it's a {@code null}), {@code
+ * false} otherwise
+ */
+ public boolean hasDefaultValue() {
+ return hasDefaultValue;
+ }
+
+ /**
+ * Returns the default value of the parameter. The default value should
be defined whenever
+ * possible. The default value can be a {@code null} even if
hasDefaultValue returns true.
+ *
+ * @return the default value of the param, {@code null} if not defined
+ */
+ public V getDefaultValue() {
+ return defaultValue;
+ }
+
+ /**
+ * Returns the validator to validate the value of the parameter.
+ *
+ * @return the validator to validate the value of the parameter.
+ */
+ public ParamValidator<V> getValidator() {
+ return validator;
+ }
+
+ /**
+ * Returns the class of the param value. It's usually needed in
serialization.
+ *
+ * @return the class of the param value
+ */
+ public Class<V> getValueClass() {
+ return valueClass;
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfoFactory.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfoFactory.java
new file mode 100644
index 0000000..4c49580
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamInfoFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+/**
+ * Factory to create ParamInfo, all ParamInfos should be created via this
class.
+ */
+public class ParamInfoFactory {
+ /**
+ * Returns a ParamInfoBuilder to configure and build a new ParamInfo.
+ *
+ * @param name name of the new ParamInfo
+ * @param valueClass value class of the new ParamInfo
+ * @param <V> value type of the new ParamInfo
+ * @return a ParamInfoBuilder
+ */
+ public static <V> ParamInfoBuilder<V> createParamInfo(String name,
Class<V> valueClass) {
+ return new ParamInfoBuilder<>(name, valueClass);
+ }
+
+ /**
+ * Builder to build a new ParamInfo. Builder is created by
ParamInfoFactory with name and
+ * valueClass set.
+ *
+ * @param <V> value type of the new ParamInfo
+ */
+ public static class ParamInfoBuilder<V> {
+ private String name;
+ private String[] alias = new String[0];
+ private String description;
+ private boolean isOptional = true;
+ private boolean hasDefaultValue = false;
+ private V defaultValue;
+ private ParamValidator<V> validator;
+ private Class<V> valueClass;
+
+ ParamInfoBuilder(String name, Class<V> valueClass) {
+ this.name = name;
+ this.valueClass = valueClass;
+ }
+
+ /**
+ * Sets the aliases of the parameter.
+ *
+ * @return the builder itself
+ */
+ public ParamInfoBuilder<V> setAlias(String[] alias) {
+ this.alias = alias;
+ return this;
+ }
+
+ /**
+ * Sets the description of the parameter.
+ *
+ * @return the builder itself
+ */
+ public ParamInfoBuilder<V> setDescription(String description) {
+ this.description = description;
+ return this;
+ }
+
+ /**
+ * Sets the flag indicating the parameter is optional. The
parameter is optional by default.
+ *
+ * @return the builder itself
+ */
+ public ParamInfoBuilder<V> setOptional() {
+ this.isOptional = true;
+ return this;
+ }
+
+ /**
+ * Sets the flag indicating the parameter is required.
+ *
+ * @return the builder itself
+ */
+ public ParamInfoBuilder<V> setRequired() {
+ this.isOptional = false;
+ return this;
+ }
+
+ /**
+ * Sets the flag indicating the parameter has default value,
and sets the default value.
+ *
+ * @return the builder itself
+ */
+ public ParamInfoBuilder<V> setHasDefaultValue(V defaultValue) {
+ this.hasDefaultValue = true;
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ /**
+ * Sets the validator to validate the parameter value set by
users.
+ *
+ * @return the builder itself
+ */
+ public ParamInfoBuilder<V> setValidator(ParamValidator<V>
validator) {
+ this.validator = validator;
+ return this;
+ }
+
+ /**
+ * Builds the defined ParamInfo and returns it. The ParamInfo
will be immutable.
+ *
+ * @return the defined ParamInfo
+ */
+ public ParamInfo<V> build() {
+ return new ParamInfo<>(name, alias, description,
isOptional, hasDefaultValue,
+ defaultValue, validator, valueClass);
+ }
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamValidator.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamValidator.java
new file mode 100644
index 0000000..c95b146
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/ParamValidator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * An interface used by {@link ParamInfo} to do validation when a parameter
value is set.
+ *
+ * @param <V> the type of the value to validate
+ */
+@PublicEvolving
+public interface ParamValidator<V> extends Serializable {
+ /**
+ * Validates a parameter value.
+ *
+ * @param value value to validate
+ * @return {@code true} if the value is valid, {@code false} otherwise
+ */
+ boolean validate(V value);
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
new file mode 100644
index 0000000..0c1e0d8
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The map-like container class for parameter. This class is provided to unify
the interaction with
+ * parameters.
+ */
+@PublicEvolving
+public class Params implements Serializable {
+ private final Map<String, Object> paramMap = new HashMap<>();
+
+ /**
+ * Returns the value of the specific parameter, or default value
defined in the {@code info} if
+ * this Params doesn't contain the param.
+ *
+ * @param info the info of the specific parameter, usually with default
value
+ * @param <V> the type of the specific parameter
+ * @return the value of the specific parameter, or default value
defined in the {@code info} if
+ * this Params doesn't contain the parameter
+ * @throws RuntimeException if the Params doesn't contains the specific
parameter, while the
+ * param is not optional but has no default
value in the {@code info}
+ */
+ @SuppressWarnings("unchecked")
+ public <V> V get(ParamInfo<V> info) {
+ V value = (V) paramMap.getOrDefault(info.getName(),
info.getDefaultValue());
+ if (value == null && !info.isOptional() &&
!info.hasDefaultValue()) {
+ throw new RuntimeException(info.getName() +
+ " not exist which is not optional and don't
have a default value");
+ }
+ return value;
+ }
+
+ /**
+ * Set the value of the specific parameter.
+ *
+ * @param info the info of the specific parameter to set.
+ * @param value the value to be set to the specific parameter.
+ * @param <V> the type of the specific parameter.
+ * @return the previous value of the specific parameter, or null if
this Params didn't contain
+ * the parameter before
+ * @throws RuntimeException if the {@code info} has a validator and the
{@code value} is
+ * evaluated as illegal by the validator
+ */
+ public <V> Params set(ParamInfo<V> info, V value) {
+ if (!info.isOptional() && value == null) {
+ throw new RuntimeException(
+ "Setting " + info.getName() + " as null while
it's not a optional param");
+ }
+ if (value == null) {
+ remove(info);
+ return this;
+ }
+
+ if (info.getValidator() != null &&
!info.getValidator().validate(value)) {
+ throw new RuntimeException(
+ "Setting " + info.getName() + " as a invalid
value:" + value);
+ }
+ paramMap.put(info.getName(), value);
+ return this;
+ }
+
+ /**
+ * Removes the specific parameter from this Params.
+ *
+ * @param info the info of the specific parameter to remove
+ * @param <V> the type of the specific parameter
+ */
+ public <V> void remove(ParamInfo<V> info) {
+ paramMap.remove(info.getName());
+ }
+
+ /**
+ * Creates and returns a deep clone of this Params.
+ *
+ * @return a deep clone of this Params
+ */
+ public Params clone() {
+ Params newParams = new Params();
+ newParams.paramMap.putAll(this.paramMap);
+ return newParams;
+ }
+
+ /**
+ * Returns a json containing all parameters in this Params. The json
should be human-readable if
+ * possible.
+ *
+ * @return a json containing all parameters in this Params
+ */
+ public String toJson() {
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, String> stringMap = new HashMap<>();
+ try {
+ for (Map.Entry<String, Object> e : paramMap.entrySet())
{
+ stringMap.put(e.getKey(),
mapper.writeValueAsString(e.getValue()));
+ }
+ return mapper.writeValueAsString(stringMap);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize params
to json", e);
+ }
+ }
+
+ /**
+ * Restores the parameters from the given json. The parameters should
be exactly the same with
+ * the one who was serialized to the input json after the restoration.
The class mapping of the
+ * parameters in the json is required because it is hard to directly
restore a param of a user
+ * defined type. Params will be treated as String if it doesn't exist
in the {@code classMap}.
+ *
+ * @param json the json String to restore from
+ * @param classMap the classes of the parameters contained in the json
+ */
+ @SuppressWarnings("unchecked")
+ public void loadJson(String json, Map<String, Class<?>> classMap) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ Map<String, String> m = mapper.readValue(json,
Map.class);
+ for (Map.Entry<String, String> e : m.entrySet()) {
+ Class<?> valueClass =
classMap.getOrDefault(e.getKey(), String.class);
+ paramMap.put(e.getKey(),
mapper.readValue(e.getValue(), valueClass));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to deserialize
json:" + json, e);
+ }
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/WithParams.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/WithParams.java
new file mode 100644
index 0000000..5e87508
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/WithParams.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc.param;
+
+/**
+ * Parameters are widely used in machine learning realm. This class defines a
common interface to
+ * interact with classes with parameters.
+ *
+ * @param <T> the actual type of this WithParams, as the return type of setter
+ */
+public interface WithParams<T> {
+ /**
+ * Returns the all the parameters.
+ *
+ * @return all the parameters.
+ */
+ Params getParams();
+
+ /**
+ * Set the value of a specific parameter.
+ *
+ * @param info the info of the specific param to set
+ * @param value the value to be set to the specific param
+ * @param <V> the type of the specific param
+ * @return the WithParams itself
+ */
+ @SuppressWarnings("unchecked")
+ default <V> T set(ParamInfo<V> info, V value) {
+ getParams().set(info, value);
+ return (T) this;
+ }
+
+ /**
+ * Returns the value of the specific param.
+ *
+ * @param info the info of the specific param, usually with default
value
+ * @param <V> the type of the specific param
+ * @return the value of the specific param, or default value defined in
the {@code info} if the
+ * inner Params doesn't contains this param
+ */
+ default <V> V get(ParamInfo<V> info) {
+ return getParams().get(info);
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/util/param/ExtractParamInfosUtil.java
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/util/param/ExtractParamInfosUtil.java
new file mode 100644
index 0000000..c59d2e0
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/util/param/ExtractParamInfosUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util.param;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility to extract all ParamInfos defined in a WithParams, mainly used in
persistence.
+ */
+public final class ExtractParamInfosUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExtractParamInfosUtil.class);
+
+ /**
+ * Extracts all ParamInfos defined in the given WithParams, including
those in its superclasses
+ * and interfaces.
+ *
+ * @param s the WithParams to extract ParamInfos from
+ * @return the list of all ParamInfos defined in s
+ */
+ public static List<ParamInfo> extractParamInfos(WithParams s) {
+ return extractParamInfos(s, s.getClass());
+ }
+
+ private static List<ParamInfo> extractParamInfos(WithParams s, Class
clz) {
+ List<ParamInfo> result = new ArrayList<>();
+ if (clz == null) {
+ return result;
+ }
+
+ Field[] fields = clz.getDeclaredFields();
+ for (Field f : fields) {
+ f.setAccessible(true);
+ if (ParamInfo.class.isAssignableFrom(f.getType())) {
+ try {
+ result.add((ParamInfo) f.get(s));
+ } catch (IllegalAccessException e) {
+ LOG.warn("Failed to extract param info
{}, ignore it", f.getName(), e);
+ }
+ }
+ }
+
+ result.addAll(extractParamInfos(s, clz.getSuperclass()));
+ for (Class c : clz.getInterfaces()) {
+ result.addAll(extractParamInfos(s, c));
+ }
+
+ return result;
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/core/PipelineTest.java
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/core/PipelineTest.java
new file mode 100644
index 0000000..fc82634
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/core/PipelineTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests the behavior of {@link Pipeline}.
+ */
+public class PipelineTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testPipelineBehavior() {
+ Pipeline pipeline = new Pipeline();
+ pipeline.appendStage(new MockTransformer("a"));
+ pipeline.appendStage(new MockEstimator("b"));
+ pipeline.appendStage(new MockEstimator("c"));
+ pipeline.appendStage(new MockTransformer("d"));
+ assert describePipeline(pipeline).equals("a_b_c_d");
+
+ Pipeline pipelineModel = pipeline.fit(null, null);
+ assert describePipeline(pipelineModel).equals("a_mb_mc_d");
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Pipeline contains Estimator, need to fit
first.");
+ pipeline.transform(null, null);
+ }
+
+ @Test
+ public void testPipelineRestore() {
+ Pipeline pipeline = new Pipeline();
+ pipeline.appendStage(new MockTransformer("a"));
+ pipeline.appendStage(new MockEstimator("b"));
+ pipeline.appendStage(new MockEstimator("c"));
+ pipeline.appendStage(new MockTransformer("d"));
+ String pipelineJson = pipeline.toJson();
+
+ Pipeline restoredPipeline = new Pipeline(pipelineJson);
+ assert describePipeline(restoredPipeline).equals("a_b_c_d");
+
+ Pipeline pipelineModel = pipeline.fit(null, null);
+ String modelJson = pipelineModel.toJson();
+
+ Pipeline restoredPipelineModel = new Pipeline(modelJson);
+ assert
describePipeline(restoredPipelineModel).equals("a_mb_mc_d");
+ }
+
+ private static String describePipeline(Pipeline p) {
+ StringBuilder res = new StringBuilder();
+ for (PipelineStage s : p.getStages()) {
+ if (res.length() != 0) {
+ res.append("_");
+ }
+ res.append(((SelfDescribe) s).describe());
+ }
+ return res.toString();
+ }
+
+ /**
+ * Interface to describe a class with a string, only for pipeline test.
+ */
+ private interface SelfDescribe {
+ ParamInfo<String> DESCRIPTION =
ParamInfoFactory.createParamInfo("description",
+ String.class).build();
+
+ String describe();
+ }
+
+ /**
+ * Mock estimator for pipeline test.
+ */
+ public static class MockEstimator implements Estimator<MockEstimator,
MockModel>, SelfDescribe {
+ private final Params params = new Params();
+
+ public MockEstimator() {
+ }
+
+ MockEstimator(String description) {
+ set(DESCRIPTION, description);
+ }
+
+ @Override
+ public MockModel fit(TableEnvironment tEnv, Table input) {
+ return new MockModel("m" + describe());
+ }
+
+ @Override
+ public Params getParams() {
+ return params;
+ }
+
+ @Override
+ public String describe() {
+ return get(DESCRIPTION);
+ }
+ }
+
+ /**
+ * Mock transformer for pipeline test.
+ */
+ public static class MockTransformer implements
Transformer<MockTransformer>, SelfDescribe {
+ private final Params params = new Params();
+
+ public MockTransformer() {
+ }
+
+ MockTransformer(String description) {
+ set(DESCRIPTION, description);
+ }
+
+ @Override
+ public Table transform(TableEnvironment tEnv, Table input) {
+ return input;
+ }
+
+ @Override
+ public Params getParams() {
+ return params;
+ }
+
+ @Override
+ public String describe() {
+ return get(DESCRIPTION);
+ }
+ }
+
+ /**
+ * Mock model for pipeline test.
+ */
+ public static class MockModel implements Model<MockModel>, SelfDescribe
{
+ private final Params params = new Params();
+
+ public MockModel() {
+ }
+
+ MockModel(String description) {
+ set(DESCRIPTION, description);
+ }
+
+ @Override
+ public Table transform(TableEnvironment tEnv, Table input) {
+ return input;
+ }
+
+ @Override
+ public Params getParams() {
+ return params;
+ }
+
+ @Override
+ public String describe() {
+ return get(DESCRIPTION);
+ }
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/misc/ParamsTest.java
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/misc/ParamsTest.java
new file mode 100644
index 0000000..8bdf95b
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/api/misc/ParamsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.misc;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.Params;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for the behavior and validator of {@link Params}.
+ */
+public class ParamsTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testDefaultBehavior() {
+ Params params = new Params();
+
+ ParamInfo<String> optionalWithoutDefault =
+ ParamInfoFactory.createParamInfo("a",
String.class).build();
+ assert params.get(optionalWithoutDefault) == null;
+
+ ParamInfo<String> optionalWithDefault =
+ ParamInfoFactory.createParamInfo("a",
String.class).setHasDefaultValue("def").build();
+ assert params.get(optionalWithDefault).equals("def");
+
+ ParamInfo<String> requiredWithDefault =
+ ParamInfoFactory.createParamInfo("a",
String.class).setRequired()
+ .setHasDefaultValue("def").build();
+ assert params.get(requiredWithDefault).equals("def");
+
+ ParamInfo<String> requiredWithoutDefault =
+ ParamInfoFactory.createParamInfo("a",
String.class).setRequired().build();
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("a not exist which is not optional and
don't have a default value");
+ params.get(requiredWithoutDefault);
+ }
+
+ @Test
+ public void testValidator() {
+ Params params = new Params();
+
+ ParamInfo<Integer> intParam =
+ ParamInfoFactory.createParamInfo("a",
Integer.class).setValidator(i -> i > 0).build();
+ params.set(intParam, 1);
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Setting a as a invalid value:0");
+ params.set(intParam, 0);
+ }
+}
diff --git
a/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/util/param/ExtractParamInfosUtilTest.java
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/util/param/ExtractParamInfosUtilTest.java
new file mode 100644
index 0000000..5f467e9
--- /dev/null
+++
b/flink-ml-parent/flink-ml-api/src/test/java/org/apache/flink/ml/util/param/ExtractParamInfosUtilTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util.param;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for {@link ExtractParamInfosUtil}.
+ */
+public class ExtractParamInfosUtilTest {
+
+ @Test
+ public void testExtractParamInfos() {
+ List<ParamInfo> noParamInfos =
+ ExtractParamInfosUtil.extractParamInfos(new
WithNoParamInfo());
+ assert noParamInfos.isEmpty();
+
+ List<ParamInfo> classParamInfos =
+ ExtractParamInfosUtil.extractParamInfos(new
WithTestParamInfo());
+ assert classParamInfos.size() == 1 &&
classParamInfos.get(0).getName().equals("KSC");
+
+ List<ParamInfo> allParamInfos =
+ ExtractParamInfosUtil.extractParamInfos(new
TestParamInfoWithInheritedParamInfos());
+ String[] sortedCorrectParamNames = new String[]{"KCP", "KI",
"KSC"};
+ assert allParamInfos.size() == 3 &&
Arrays.equals(sortedCorrectParamNames,
+
allParamInfos.stream().map(ParamInfo::getName).sorted().toArray(String[]::new));
+ }
+
+ /**
+ * Mock WithParams implementation with no ParamInfo. Only for test.
+ */
+ public static class WithNoParamInfo implements
WithParams<WithNoParamInfo> {
+
+ @Override
+ public Params getParams() {
+ return null;
+ }
+ }
+
+ /**
+ * Mock WithParams implementation with one ParamInfo. Only for test.
+ * @param <T> subclass of WithTestParamInfo
+ */
+ public static class WithTestParamInfo<T extends WithTestParamInfo>
implements WithParams<T> {
+ public static final ParamInfo<String> KSC = ParamInfoFactory
+ .createParamInfo("KSC", String.class)
+ .setDescription("key from super class").build();
+
+ @Override
+ public Params getParams() {
+ return null;
+ }
+ }
+
+ /**
+ * Mock interface extending WithParams with one ParamInfo. Only for
test.
+ * @param <T> implementation class of InterfaceWithParamInfo
+ */
+ public interface InterfaceWithParamInfo<T extends
InterfaceWithParamInfo>
+ extends WithParams<T> {
+ ParamInfo<String> KI = ParamInfoFactory.createParamInfo("KI",
String.class)
+ .setDescription("key from interface").build();
+ }
+
+ /**
+ * Mock WithParams inheriting ParamInfos from superclass and interface.
Only for test.
+ */
+ public static class TestParamInfoWithInheritedParamInfos
+ extends WithTestParamInfo<TestParamInfoWithInheritedParamInfos>
+ implements
InterfaceWithParamInfo<TestParamInfoWithInheritedParamInfos> {
+ private static final ParamInfo<String> KCP = ParamInfoFactory
+ .createParamInfo("KCP", String.class)
+ .setDescription("key in the class which is
private").build();
+
+ @Override
+ public Params getParams() {
+ return null;
+ }
+ }
+}
diff --git a/flink-ml-parent/pom.xml b/flink-ml-parent/pom.xml
new file mode 100644
index 0000000..6ca2ebc
--- /dev/null
+++ b/flink-ml-parent/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-ml-parent</artifactId>
+
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-ml-api</module>
+ </modules>
+</project>
diff --git a/pom.xml b/pom.xml
index 086e452..38b3601 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@ under the License.
<module>flink-fs-tests</module>
<module>flink-docs</module>
<module>flink-python</module>
+ <module>flink-ml-parent</module>
</modules>
<properties>